Properly unlock the socket file lock in ansible-connection (#39223)
Also use a lock file per host, rather than one global file lock.
Commit 9c0275a879
introduced a bug where the lock file was only being
unlocked by the child PID of the resulting fork done in ansible-connection.
This causes delays when a large inventory causes a lot of contention on
that global lock. This patch fixes the problem by ensuring the lock is
released regardless of the fork condition, and also to use a lock file
based on the remote address of the target host, removing the global lock
bottleneck.
Fixes #38892
This commit is contained in:
parent
12f2b9506d
commit
7ce9968ce1
1 changed files with 57 additions and 46 deletions
|
@ -20,6 +20,8 @@ import traceback
|
||||||
import errno
|
import errno
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
from ansible import constants as C
|
from ansible import constants as C
|
||||||
from ansible.module_utils._text import to_bytes, to_native, to_text
|
from ansible.module_utils._text import to_bytes, to_native, to_text
|
||||||
from ansible.module_utils.six import PY3
|
from ansible.module_utils.six import PY3
|
||||||
|
@ -33,6 +35,21 @@ from ansible.utils.display import Display
|
||||||
from ansible.utils.jsonrpc import JsonRpcServer
|
from ansible.utils.jsonrpc import JsonRpcServer
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def file_lock(lock_path):
|
||||||
|
"""
|
||||||
|
Uses contextmanager to create and release a file lock based on the
|
||||||
|
given path. This allows us to create locks using `with file_lock()`
|
||||||
|
to prevent deadlocks related to failure to unlock properly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o600)
|
||||||
|
fcntl.lockf(lock_fd, fcntl.LOCK_EX)
|
||||||
|
yield
|
||||||
|
fcntl.lockf(lock_fd, fcntl.LOCK_UN)
|
||||||
|
os.close(lock_fd)
|
||||||
|
|
||||||
|
|
||||||
class ConnectionProcess(object):
|
class ConnectionProcess(object):
|
||||||
'''
|
'''
|
||||||
The connection process wraps around a Connection object that manages
|
The connection process wraps around a Connection object that manages
|
||||||
|
@ -209,60 +226,54 @@ def main():
|
||||||
tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR)
|
tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR)
|
||||||
makedirs_safe(tmp_path)
|
makedirs_safe(tmp_path)
|
||||||
|
|
||||||
lock_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path)
|
lock_path = unfrackpath("%s/.ansible_pc_lock_%s" % (tmp_path, play_context.remote_addr))
|
||||||
socket_path = unfrackpath(cp % dict(directory=tmp_path))
|
socket_path = unfrackpath(cp % dict(directory=tmp_path))
|
||||||
|
|
||||||
# if the socket file doesn't exist, spin up the daemon process
|
with file_lock(lock_path):
|
||||||
lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o600)
|
if not os.path.exists(socket_path):
|
||||||
fcntl.lockf(lock_fd, fcntl.LOCK_EX)
|
messages.append('local domain socket does not exist, starting it')
|
||||||
|
original_path = os.getcwd()
|
||||||
|
r, w = os.pipe()
|
||||||
|
pid = fork_process()
|
||||||
|
|
||||||
if not os.path.exists(socket_path):
|
if pid == 0:
|
||||||
messages.append('local domain socket does not exist, starting it')
|
try:
|
||||||
original_path = os.getcwd()
|
os.close(r)
|
||||||
r, w = os.pipe()
|
wfd = os.fdopen(w, 'w')
|
||||||
pid = fork_process()
|
process = ConnectionProcess(wfd, play_context, socket_path, original_path, ansible_playbook_pid)
|
||||||
|
process.start()
|
||||||
|
except Exception:
|
||||||
|
messages.append(traceback.format_exc())
|
||||||
|
rc = 1
|
||||||
|
|
||||||
if pid == 0:
|
if rc == 0:
|
||||||
try:
|
process.run()
|
||||||
os.close(r)
|
|
||||||
wfd = os.fdopen(w, 'w')
|
|
||||||
process = ConnectionProcess(wfd, play_context, socket_path, original_path, ansible_playbook_pid)
|
|
||||||
process.start()
|
|
||||||
except Exception:
|
|
||||||
messages.append(traceback.format_exc())
|
|
||||||
rc = 1
|
|
||||||
|
|
||||||
fcntl.lockf(lock_fd, fcntl.LOCK_UN)
|
sys.exit(rc)
|
||||||
os.close(lock_fd)
|
|
||||||
|
|
||||||
if rc == 0:
|
else:
|
||||||
process.run()
|
os.close(w)
|
||||||
|
rfd = os.fdopen(r, 'r')
|
||||||
sys.exit(rc)
|
data = json.loads(rfd.read())
|
||||||
|
messages.extend(data.pop('messages'))
|
||||||
|
result.update(data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
os.close(w)
|
messages.append('found existing local domain socket, using it!')
|
||||||
rfd = os.fdopen(r, 'r')
|
conn = Connection(socket_path)
|
||||||
data = json.loads(rfd.read())
|
pc_data = to_text(init_data)
|
||||||
messages.extend(data.pop('messages'))
|
try:
|
||||||
result.update(data)
|
messages.extend(conn.update_play_context(pc_data))
|
||||||
|
except Exception as exc:
|
||||||
else:
|
# Only network_cli has update_play context, so missing this is
|
||||||
messages.append('found existing local domain socket, using it!')
|
# not fatal e.g. netconf
|
||||||
conn = Connection(socket_path)
|
if isinstance(exc, ConnectionError) and getattr(exc, 'code', None) == -32601:
|
||||||
pc_data = to_text(init_data)
|
pass
|
||||||
try:
|
else:
|
||||||
messages.extend(conn.update_play_context(pc_data))
|
result.update({
|
||||||
except Exception as exc:
|
'error': to_text(exc),
|
||||||
# Only network_cli has update_play context, so missing this is
|
'exception': traceback.format_exc()
|
||||||
# not fatal e.g. netconf
|
})
|
||||||
if isinstance(exc, ConnectionError) and getattr(exc, 'code', None) == -32601:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
result.update({
|
|
||||||
'error': to_text(exc),
|
|
||||||
'exception': traceback.format_exc()
|
|
||||||
})
|
|
||||||
|
|
||||||
messages.append(sys.stdout.getvalue())
|
messages.append(sys.stdout.getvalue())
|
||||||
result.update({
|
result.update({
|
||||||
|
|
Loading…
Add table
Reference in a new issue