Introduce a connection locking infrastructure
The lock file is (a temporary file) opened in the parent process, whose open fd is inherited by the workers after fork, and passed down through the PlayContext. Connection grows lock/unlock methods which can be used by individual connection plugins.
This commit is contained in:
parent
88d3751c28
commit
5887e96b27
3 changed files with 21 additions and 0 deletions
|
@ -23,6 +23,7 @@ import multiprocessing
|
|||
import os
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from ansible import constants as C
|
||||
from ansible.errors import AnsibleError
|
||||
|
@ -78,6 +79,10 @@ class TaskQueueManager:
|
|||
self._failed_hosts = dict()
|
||||
self._unreachable_hosts = dict()
|
||||
|
||||
# A temporary file (opened pre-fork) used by connection plugins for
|
||||
# inter-process locking.
|
||||
self._options.connection_lockfile = tempfile.TemporaryFile()
|
||||
|
||||
self._final_q = multiprocessing.Queue()
|
||||
|
||||
# create the pool of worker threads, based on the number of forks specified
|
||||
|
|
|
@ -161,6 +161,7 @@ class PlayContext(Base):
|
|||
_private_key_file = FieldAttribute(isa='string', default=C.DEFAULT_PRIVATE_KEY_FILE)
|
||||
_timeout = FieldAttribute(isa='int', default=C.DEFAULT_TIMEOUT)
|
||||
_shell = FieldAttribute(isa='string')
|
||||
_connection_lockfd= FieldAttribute(isa='int', default=None)
|
||||
|
||||
# privilege escalation fields
|
||||
_become = FieldAttribute(isa='bool')
|
||||
|
@ -244,6 +245,11 @@ class PlayContext(Base):
|
|||
if options.connection:
|
||||
self.connection = options.connection
|
||||
|
||||
# The lock file is opened in the parent process, and the workers will
|
||||
# inherit the open file, so we just need to help them find it.
|
||||
if options.connection_lockfile:
|
||||
self.connection_lockfd = options.connection_lockfile.fileno()
|
||||
|
||||
self.remote_user = options.remote_user
|
||||
self.private_key_file = options.private_key_file
|
||||
|
||||
|
|
|
@ -155,3 +155,13 @@ class ConnectionBase(with_metaclass(ABCMeta, object)):
|
|||
if incorrect_password in output:
|
||||
raise AnsibleError('Incorrect %s password' % self._play_context.become_method)
|
||||
|
||||
def lock_connection(self):
|
||||
f = self._play_context.connection_lockfd
|
||||
self._display.vvvv('CONNECTION: pid %d waiting for lock on %d' % (os.getpid(), f))
|
||||
fcntl.lockf(f, fcntl.LOCK_EX)
|
||||
self._display.vvvv('CONNECTION: pid %d acquired lock on %d' % (os.getpid(), f))
|
||||
|
||||
def unlock_connection(self):
|
||||
f = self._play_context.connection_lockfd
|
||||
fcntl.lockf(f, fcntl.LOCK_UN)
|
||||
self._display.vvvv('CONNECTION: pid %d released lock on %d' % (os.getpid(), f))
|
||||
|
|
Loading…
Reference in a new issue