From 5887e96b270b2f0d19d3e34a09e16e21824c4887 Mon Sep 17 00:00:00 2001 From: Abhijit Menon-Sen Date: Sat, 1 Aug 2015 09:26:07 +0530 Subject: [PATCH 1/2] Introduce a connection locking infrastructure The lock file is (a temporary file) opened in the parent process, whose open fd is inherited by the workers after fork, and passed down through the PlayContext. Connection grows lock/unlock methods which can be used by individual connection plugins. --- lib/ansible/executor/task_queue_manager.py | 5 +++++ lib/ansible/playbook/play_context.py | 6 ++++++ lib/ansible/plugins/connections/__init__.py | 10 ++++++++++ 3 files changed, 21 insertions(+) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 59f48142b19..1189e35f914 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -23,6 +23,7 @@ import multiprocessing import os import socket import sys +import tempfile from ansible import constants as C from ansible.errors import AnsibleError @@ -78,6 +79,10 @@ class TaskQueueManager: self._failed_hosts = dict() self._unreachable_hosts = dict() + # A temporary file (opened pre-fork) used by connection plugins for + # inter-process locking. + self._options.connection_lockfile = tempfile.TemporaryFile() + self._final_q = multiprocessing.Queue() # create the pool of worker threads, based on the number of forks specified diff --git a/lib/ansible/playbook/play_context.py b/lib/ansible/playbook/play_context.py index 355efbaf26e..d4a184fa61c 100644 --- a/lib/ansible/playbook/play_context.py +++ b/lib/ansible/playbook/play_context.py @@ -161,6 +161,7 @@ class PlayContext(Base): _private_key_file = FieldAttribute(isa='string', default=C.DEFAULT_PRIVATE_KEY_FILE) _timeout = FieldAttribute(isa='int', default=C.DEFAULT_TIMEOUT) _shell = FieldAttribute(isa='string') + _connection_lockfd= FieldAttribute(isa='int', default=None) # privilege escalation fields _become = FieldAttribute(isa='bool') @@ -244,6 +245,11 @@ class PlayContext(Base): if options.connection: self.connection = options.connection + # The lock file is opened in the parent process, and the workers will + # inherit the open file, so we just need to help them find it. + if options.connection_lockfile: + self.connection_lockfd = options.connection_lockfile.fileno() + self.remote_user = options.remote_user self.private_key_file = options.private_key_file diff --git a/lib/ansible/plugins/connections/__init__.py b/lib/ansible/plugins/connections/__init__.py index 1ad28763817..5dfcf4c344b 100644 --- a/lib/ansible/plugins/connections/__init__.py +++ b/lib/ansible/plugins/connections/__init__.py @@ -155,3 +155,13 @@ class ConnectionBase(with_metaclass(ABCMeta, object)): if incorrect_password in output: raise AnsibleError('Incorrect %s password' % self._play_context.become_method) + def lock_connection(self): + f = self._play_context.connection_lockfd + self._display.vvvv('CONNECTION: pid %d waiting for lock on %d' % (os.getpid(), f)) + fcntl.lockf(f, fcntl.LOCK_EX) + self._display.vvvv('CONNECTION: pid %d acquired lock on %d' % (os.getpid(), f)) + + def unlock_connection(self): + f = self._play_context.connection_lockfd + fcntl.lockf(f, fcntl.LOCK_UN) + self._display.vvvv('CONNECTION: pid %d released lock on %d' % (os.getpid(), f)) From 9378c8e2dae0e50a186181c50b3e462fcfa0fdaa Mon Sep 17 00:00:00 2001 From: Abhijit Menon-Sen Date: Wed, 2 Sep 2015 22:52:35 +0530 Subject: [PATCH 2/2] Make the paramiko plugin use locking --- .../plugins/connections/paramiko_ssh.py | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/ansible/plugins/connections/paramiko_ssh.py b/lib/ansible/plugins/connections/paramiko_ssh.py index df97a6e3a58..214e6b5a03e 100644 --- a/lib/ansible/plugins/connections/paramiko_ssh.py +++ b/lib/ansible/plugins/connections/paramiko_ssh.py @@ -71,16 +71,15 @@ class MyAddPolicy(object): local L{HostKeys} object, and saving it. This is used by L{SSHClient}. """ - def __init__(self, new_stdin): + def __init__(self, new_stdin, connection): self._new_stdin = new_stdin + self.connection = connection def missing_host_key(self, client, hostname, key): if C.HOST_KEY_CHECKING: - # FIXME: need to fix lock file stuff - #fcntl.lockf(self.runner.process_lockfile, fcntl.LOCK_EX) - #fcntl.lockf(self.runner.output_lockfile, fcntl.LOCK_EX) + self.connection.lock_connection() old_stdin = sys.stdin sys.stdin = self._new_stdin @@ -94,17 +93,11 @@ class MyAddPolicy(object): inp = raw_input(AUTHENTICITY_MSG % (hostname, ktype, fingerprint)) sys.stdin = old_stdin + self.connection.unlock_connection() + if inp not in ['yes','y','']: - # FIXME: lock file stuff - #fcntl.flock(self.runner.output_lockfile, fcntl.LOCK_UN) - #fcntl.flock(self.runner.process_lockfile, fcntl.LOCK_UN) raise AnsibleError("host connection rejected by user") - # FIXME: lock file stuff - #fcntl.lockf(self.runner.output_lockfile, fcntl.LOCK_UN) - #fcntl.lockf(self.runner.process_lockfile, fcntl.LOCK_UN) - - key._added_by_ansible_this_time = True # existing implementation below: @@ -159,7 +152,7 @@ class Connection(ConnectionBase): pass # file was not found, but not required to function ssh.load_system_host_keys() - ssh.set_missing_host_key_policy(MyAddPolicy(self._new_stdin)) + ssh.set_missing_host_key_policy(MyAddPolicy(self._new_stdin, self)) allow_agent = True @@ -365,6 +358,9 @@ class Connection(ConnectionBase): if C.HOST_KEY_CHECKING and C.PARAMIKO_RECORD_HOST_KEYS and self._any_keys_added(): # add any new SSH host keys -- warning -- this could be slow + # (This doesn't acquire the connection lock because it needs + # to exclude only other known_hosts writers, not connections + # that are starting up.) lockfile = self.keyfile.replace("known_hosts",".known_hosts.lock") dirname = os.path.dirname(self.keyfile) makedirs_safe(dirname)