diff --git a/lib/ansible/runner/__init__.py b/lib/ansible/runner/__init__.py index 9160a0b5928..528dfb42fff 100644 --- a/lib/ansible/runner/__init__.py +++ b/lib/ansible/runner/__init__.py @@ -49,7 +49,6 @@ from ansible.module_common import ModuleReplacer module_replacer = ModuleReplacer(strip_comments=False) -NEED_ATFORK=False HAS_ATFORK=True try: from Crypto.Random import atfork @@ -61,28 +60,30 @@ multiprocessing_runner = None OUTPUT_LOCKFILE = tempfile.TemporaryFile() PROCESS_LOCKFILE = tempfile.TemporaryFile() -from foon import Foon - -FOON = Foon() - ################################################ -class KeyboardInterruptError(Exception): - pass - -def _executor_hook(params): - - (host, my_stdin) = params +def _executor_hook(job_queue, result_queue, new_stdin): # attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17 # this function also not present in CentOS 6 - if HAS_ATFORK and NEED_ATFORK: + if HAS_ATFORK: atfork() - try: - return multiprocessing_runner._executor(host, my_stdin) - except KeyboardInterrupt: - raise KeyboardInterruptError() + signal.signal(signal.SIGINT, signal.SIG_IGN) + while not job_queue.empty(): + try: + host = job_queue.get(block=False) + return_data = multiprocessing_runner._executor(host, new_stdin) + result_queue.put(return_data) + + if 'LEGACY_TEMPLATE_WARNING' in return_data.flags: + # pass data back up across the multiprocessing fork boundary + template.Flags.LEGACY_TEMPLATE_WARNING = True + + except Queue.Empty: + pass + except: + traceback.print_exc() class HostVars(dict): ''' A special view of setup_cache that adds values from the inventory when needed. ''' @@ -208,9 +209,6 @@ class Runner(object): else: self.transport = "ssh" - if self.transport == "paramiko": - global NEED_ATFORK - NEED_ATFORK=True # misc housekeeping if subset and self.inventory._subset is None: @@ -1058,11 +1056,39 @@ class Runner(object): # ***************************************************** + def _parallel_exec(self, hosts): ''' handles mulitprocessing when more than 1 fork is required ''' - FOON.set_size(self.forks) - return FOON.map(_executor_hook, hosts) + manager = multiprocessing.Manager() + job_queue = manager.Queue() + for host in hosts: + job_queue.put(host) + result_queue = manager.Queue() + + workers = [] + for i in range(self.forks): + new_stdin = os.fdopen(os.dup(sys.stdin.fileno())) + prc = multiprocessing.Process(target=_executor_hook, + args=(job_queue, result_queue, new_stdin)) + prc.start() + workers.append(prc) + + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + for worker in workers: + worker.terminate() + worker.join() + + results = [] + try: + while not result_queue.empty(): + results.append(result_queue.get(block=False)) + except socket.error: + raise errors.AnsibleError("") + return results # *****************************************************