From dfb1c0647e907dd27509922da995932dcc6be8db Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Thu, 15 Sep 2016 16:55:54 -0500 Subject: [PATCH 1/2] Revert "Move queuing tasks to a background thread" This reverts commit b71957d6e6d666dc9594e798e4230e908c19b299. --- lib/ansible/executor/action_write_locks.py | 43 ------ lib/ansible/executor/module_common.py | 8 +- lib/ansible/executor/process/worker.py | 7 +- lib/ansible/executor/task_queue_manager.py | 128 ++---------------- lib/ansible/plugins/strategy/__init__.py | 116 ++++++++++++++-- lib/ansible/plugins/strategy/linear.py | 7 +- .../plugins/strategies/test_strategy_base.py | 69 +++++----- test/units/template/test_templar.py | 2 +- 8 files changed, 158 insertions(+), 222 deletions(-) delete mode 100644 lib/ansible/executor/action_write_locks.py diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py deleted file mode 100644 index 413d56d9d72..00000000000 --- a/lib/ansible/executor/action_write_locks.py +++ /dev/null @@ -1,43 +0,0 @@ -# (c) 2016 - Red Hat, Inc. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from multiprocessing import Lock -from ansible.module_utils.facts import Facts - -if 'action_write_locks' not in globals(): - # Do not initialize this more than once because it seems to bash - # the existing one. multiprocessing must be reloading the module - # when it forks? - action_write_locks = dict() - - # Below is a Lock for use when we weren't expecting a named module. - # It gets used when an action plugin directly invokes a module instead - # of going through the strategies. Slightly less efficient as all - # processes with unexpected module names will wait on this lock - action_write_locks[None] = Lock() - - # These plugins are called directly by action plugins (not going through - # a strategy). We precreate them here as an optimization - mods = set(p['name'] for p in Facts.PKG_MGRS) - mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) - for mod_name in mods: - action_write_locks[mod_name] = Lock() - diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py index 86549ea0b7a..d7dac13de0a 100644 --- a/lib/ansible/executor/module_common.py +++ b/lib/ansible/executor/module_common.py @@ -36,7 +36,7 @@ from ansible.module_utils._text import to_bytes, to_text # Must import strategy and use write_locks from there # If we import write_locks directly then we end up binding a # variable to the object and then it never gets updated. -from ansible.executor import action_write_locks +from ansible.plugins import strategy try: from __main__ import display @@ -605,16 +605,16 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename) zipdata = open(cached_module_filename, 'rb').read() else: - if module_name in action_write_locks.action_write_locks: + if module_name in strategy.action_write_locks: display.debug('ANSIBALLZ: Using lock for %s' % module_name) - lock = action_write_locks.action_write_locks[module_name] + lock = strategy.action_write_locks[module_name] else: # If the action plugin directly invokes the module (instead of # going through a strategy) then we don't have a cross-process # Lock specifically for this module. Use the "unexpected # module" lock instead display.debug('ANSIBALLZ: Using generic lock for %s' % module_name) - lock = action_write_locks.action_write_locks[None] + lock = strategy.action_write_locks[None] display.debug('ANSIBALLZ: Acquiring lock') with lock: diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index d93de24ab39..0647c59b413 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -55,12 +55,12 @@ class WorkerProcess(multiprocessing.Process): for reading later. ''' - def __init__(self, rslt_q, play, host, task, task_vars, play_context, loader, variable_manager, shared_loader_obj): + def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): super(WorkerProcess, self).__init__() # takes a task queue manager as the sole param: self._rslt_q = rslt_q - self._play = play + self._task_vars = task_vars self._host = host self._task = task self._play_context = play_context @@ -68,8 +68,6 @@ class WorkerProcess(multiprocessing.Process): self._variable_manager = variable_manager self._shared_loader_obj = shared_loader_obj - self._task_vars = task_vars - # dupe stdin, if we have one self._new_stdin = sys.stdin try: @@ -151,4 +149,3 @@ class WorkerProcess(multiprocessing.Process): #with open('worker_%06d.stats' % os.getpid(), 'w') as f: # f.write(s.getvalue()) - sys.exit(0) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 6744367363f..c0032753065 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -22,22 +22,16 @@ __metaclass__ = type import multiprocessing import os import tempfile -import threading -import time - -from collections import deque from ansible import constants as C from ansible.compat.six import string_types from ansible.errors import AnsibleError -from ansible.executor import action_write_locks from ansible.executor.play_iterator import PlayIterator -from ansible.executor.process.worker import WorkerProcess from ansible.executor.stats import AggregateStats from ansible.module_utils._text import to_text from ansible.playbook.block import Block from ansible.playbook.play_context import PlayContext -from ansible.plugins import action_loader, callback_loader, connection_loader, filter_loader, lookup_loader, module_loader, strategy_loader, test_loader +from ansible.plugins import callback_loader, strategy_loader, module_loader from ansible.plugins.callback import CallbackBase from ansible.template import Templar from ansible.utils.helpers import pct_to_int @@ -52,23 +46,6 @@ except ImportError: __all__ = ['TaskQueueManager'] -# TODO: this should probably be in the plugins/__init__.py, with -# a smarter mechanism to set all of the attributes based on -# the loaders created there -class SharedPluginLoaderObj: - ''' - A simple object to make pass the various plugin loaders to - the forked processes over the queue easier - ''' - def __init__(self): - self.action_loader = action_loader - self.connection_loader = connection_loader - self.filter_loader = filter_loader - self.test_loader = test_loader - self.lookup_loader = lookup_loader - self.module_loader = module_loader - - class TaskQueueManager: ''' @@ -100,8 +77,6 @@ class TaskQueueManager: self._run_additional_callbacks = run_additional_callbacks self._run_tree = run_tree - self._iterator = None - self._callbacks_loaded = False self._callback_plugins = [] self._start_at_done = False @@ -123,86 +98,12 @@ class TaskQueueManager: self._failed_hosts = dict() self._unreachable_hosts = dict() - # the "queue" for the background thread to use - self._queued_tasks = deque() - self._queued_tasks_lock = threading.Lock() - - # the background queuing thread - self._queue_thread = None - - self._workers = [] self._final_q = multiprocessing.Queue() # A temporary file (opened pre-fork) used by connection # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - def _queue_thread_main(self): - - # create a dummy object with plugin loaders set as an easier - # way to share them with the forked processes - shared_loader_obj = SharedPluginLoaderObj() - - display.debug("queuing thread starting") - while not self._terminated: - available_workers = [] - for idx, entry in enumerate(self._workers): - (worker_prc, _) = entry - if worker_prc is None or not worker_prc.is_alive(): - available_workers.append(idx) - - if len(available_workers) == 0: - time.sleep(0.01) - continue - - for worker_idx in available_workers: - try: - self._queued_tasks_lock.acquire() - (host, task, task_vars, play_context) = self._queued_tasks.pop() - except IndexError: - break - finally: - self._queued_tasks_lock.release() - - if task.action not in action_write_locks.action_write_locks: - display.debug('Creating lock for %s' % task.action) - action_write_locks.action_write_locks[task.action] = multiprocessing.Lock() - - try: - worker_prc = WorkerProcess( - self._final_q, - self._iterator._play, - host, - task, - task_vars, - play_context, - self._loader, - self._variable_manager, - shared_loader_obj, - ) - self._workers[worker_idx][0] = worker_prc - worker_prc.start() - display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers))) - - except (EOFError, IOError, AssertionError) as e: - # most likely an abort - display.debug("got an error while queuing: %s" % e) - break - - display.debug("queuing thread exiting") - - def queue_task(self, host, task, task_vars, play_context): - self._queued_tasks_lock.acquire() - self._queued_tasks.append((host, task, task_vars, play_context)) - self._queued_tasks_lock.release() - - def queue_multiple_tasks(self, items, play_context): - for item in items: - (host, task, task_vars) = item - self._queued_tasks_lock.acquire() - self._queued_tasks.append((host, task, task_vars, play_context)) - self._queued_tasks_lock.release() - def _initialize_processes(self, num): self._workers = [] @@ -307,10 +208,6 @@ class TaskQueueManager: if not self._callbacks_loaded: self.load_callbacks() - if self._queue_thread is None: - self._queue_thread = threading.Thread(target=self._queue_thread_main) - self._queue_thread.start() - all_vars = self._variable_manager.get_vars(loader=self._loader, play=play) templar = Templar(loader=self._loader, variables=all_vars) @@ -356,7 +253,7 @@ class TaskQueueManager: raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds) # build the iterator - self._iterator = PlayIterator( + iterator = PlayIterator( inventory=self._inventory, play=new_play, play_context=play_context, @@ -371,7 +268,7 @@ class TaskQueueManager: # hosts so we know what failed this round. for host_name in self._failed_hosts.keys(): host = self._inventory.get_host(host_name) - self._iterator.mark_host_failed(host) + iterator.mark_host_failed(host) self.clear_failed_hosts() @@ -382,10 +279,10 @@ class TaskQueueManager: self._start_at_done = True # and run the play using the strategy and cleanup on way out - play_return = strategy.run(self._iterator, play_context) + play_return = strategy.run(iterator, play_context) # now re-save the hosts that failed from the iterator to our internal list - for host_name in self._iterator.get_failed_hosts(): + for host_name in iterator.get_failed_hosts(): self._failed_hosts[host_name] = True self._cleanup_processes() @@ -398,13 +295,14 @@ class TaskQueueManager: self._cleanup_processes() def _cleanup_processes(self): - for (worker_prc, rslt_q) in self._workers: - rslt_q.close() - if worker_prc and worker_prc.is_alive(): - try: - worker_prc.terminate() - except AttributeError: - pass + if hasattr(self, '_workers'): + for (worker_prc, rslt_q) in self._workers: + rslt_q.close() + if worker_prc and worker_prc.is_alive(): + try: + worker_prc.terminate() + except AttributeError: + pass def clear_failed_hosts(self): self._failed_hosts = dict() diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index d1036d4a662..aaa164ee75b 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -19,19 +19,24 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import time + +from multiprocessing import Lock from jinja2.exceptions import UndefinedError from ansible.compat.six.moves import queue as Queue from ansible.compat.six import iteritems, string_types from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable +from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host from ansible.inventory.group import Group +from ansible.module_utils.facts import Facts from ansible.playbook.helpers import load_list_of_blocks from ansible.playbook.included_file import IncludedFile from ansible.playbook.task_include import TaskInclude from ansible.playbook.role_include import IncludeRole -from ansible.plugins import action_loader +from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader from ansible.template import Templar from ansible.vars import combine_vars, strip_internal_keys from ansible.module_utils._text import to_text @@ -45,6 +50,41 @@ except ImportError: __all__ = ['StrategyBase'] +if 'action_write_locks' not in globals(): + # Do not initialize this more than once because it seems to bash + # the existing one. multiprocessing must be reloading the module + # when it forks? + action_write_locks = dict() + + # Below is a Lock for use when we weren't expecting a named module. + # It gets used when an action plugin directly invokes a module instead + # of going through the strategies. Slightly less efficient as all + # processes with unexpected module names will wait on this lock + action_write_locks[None] = Lock() + + # These plugins are called directly by action plugins (not going through + # a strategy). We precreate them here as an optimization + mods = set(p['name'] for p in Facts.PKG_MGRS) + mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) + for mod_name in mods: + action_write_locks[mod_name] = Lock() + +# TODO: this should probably be in the plugins/__init__.py, with +# a smarter mechanism to set all of the attributes based on +# the loaders created there +class SharedPluginLoaderObj: + ''' + A simple object to make pass the various plugin loaders to + the forked processes over the queue easier + ''' + def __init__(self): + self.action_loader = action_loader + self.connection_loader = connection_loader + self.filter_loader = filter_loader + self.test_loader = test_loader + self.lookup_loader = lookup_loader + self.module_loader = module_loader + class StrategyBase: @@ -56,6 +96,7 @@ class StrategyBase: def __init__(self, tqm): self._tqm = tqm self._inventory = tqm.get_inventory() + self._workers = tqm.get_workers() self._notified_handlers = tqm._notified_handlers self._listening_handlers = tqm._listening_handlers self._variable_manager = tqm.get_variable_manager() @@ -68,6 +109,7 @@ class StrategyBase: # internal counters self._pending_results = 0 + self._cur_worker = 0 # this dictionary is used to keep track of hosts that have # outstanding tasks still in queue @@ -118,10 +160,58 @@ class StrategyBase: def _queue_task(self, host, task, task_vars, play_context): ''' handles queueing the task up to be sent to a worker ''' - self._tqm.queue_task(host, task, task_vars, play_context) - self._pending_results += 1 - def _process_pending_results(self, iterator, one_pass=False, timeout=0.001): + display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) + + # Add a write lock for tasks. + # Maybe this should be added somewhere further up the call stack but + # this is the earliest in the code where we have task (1) extracted + # into its own variable and (2) there's only a single code path + # leading to the module being run. This is called by three + # functions: __init__.py::_do_handler_run(), linear.py::run(), and + # free.py::run() so we'd have to add to all three to do it there. + # The next common higher level is __init__.py::run() and that has + # tasks inside of play_iterator so we'd have to extract them to do it + # there. + + global action_write_locks + if task.action not in action_write_locks: + display.debug('Creating lock for %s' % task.action) + action_write_locks[task.action] = Lock() + + # and then queue the new task + try: + + # create a dummy object with plugin loaders set as an easier + # way to share them with the forked processes + shared_loader_obj = SharedPluginLoaderObj() + + queued = False + starting_worker = self._cur_worker + while True: + (worker_prc, rslt_q) = self._workers[self._cur_worker] + if worker_prc is None or not worker_prc.is_alive(): + worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) + self._workers[self._cur_worker][0] = worker_prc + worker_prc.start() + display.debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers))) + queued = True + self._cur_worker += 1 + if self._cur_worker >= len(self._workers): + self._cur_worker = 0 + if queued: + break + elif self._cur_worker == starting_worker: + time.sleep(0.0001) + + self._pending_results += 1 + except (EOFError, IOError, AssertionError) as e: + # most likely an abort + display.debug("got an error while queuing: %s" % e) + return + display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) + + def _process_pending_results(self, iterator, one_pass=False): ''' Reads results off the final queue and takes appropriate action based on the result (executing callbacks, updating state, etc.). @@ -180,10 +270,10 @@ class StrategyBase: else: return False - passes = 1 - while not self._tqm._terminated and passes < 3: + passes = 0 + while not self._tqm._terminated: try: - task_result = self._final_q.get(timeout=timeout) + task_result = self._final_q.get(timeout=0.001) original_host = get_original_host(task_result._host) original_task = iterator.get_original_task(original_host, task_result._task) task_result._host = original_host @@ -396,6 +486,8 @@ class StrategyBase: except Queue.Empty: passes += 1 + if passes > 2: + break if one_pass: break @@ -411,18 +503,14 @@ class StrategyBase: ret_results = [] display.debug("waiting for pending results...") - dead_check = 10 while self._pending_results > 0 and not self._tqm._terminated: + if self._tqm.has_dead_workers(): + raise AnsibleError("A worker was found in a dead state") + results = self._process_pending_results(iterator) ret_results.extend(results) - dead_check -= 1 - if dead_check == 0: - if self._pending_results > 0 and self._tqm.has_dead_workers(): - raise AnsibleError("A worker was found in a dead state") - dead_check = 10 - display.debug("no more pending results, returning what we have") return ret_results diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index 08b7a52c565..c6ef0a347b9 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -182,9 +182,7 @@ class StrategyModule(StrategyBase): any_errors_fatal = False results = [] - items_to_queue = [] for (host, task) in host_tasks: - if not task: continue @@ -258,8 +256,7 @@ class StrategyModule(StrategyBase): display.debug("sending task start callback") self._blocked_hosts[host.get_name()] = True - items_to_queue.append((host, task, task_vars)) - self._pending_results += 1 + self._queue_task(host, task, task_vars, play_context) del task_vars # if we're bypassing the host loop, break out now @@ -271,8 +268,6 @@ class StrategyModule(StrategyBase): # queue for the main thread results += self._process_pending_results(iterator, one_pass=True) - self._tqm.queue_multiple_tasks(items_to_queue, play_context) - # go to next host/task group if skip_rest: continue diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py index 2f7a13f1ccc..49fd095bd49 100644 --- a/test/units/plugins/strategies/test_strategy_base.py +++ b/test/units/plugins/strategies/test_strategy_base.py @@ -121,44 +121,45 @@ class TestStrategyBase(unittest.TestCase): mock_tqm._unreachable_hosts = ["host02"] self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:]) - #@patch.object(WorkerProcess, 'run') - #def test_strategy_base_queue_task(self, mock_worker): - # def fake_run(self): - # return + @patch.object(WorkerProcess, 'run') + def test_strategy_base_queue_task(self, mock_worker): + def fake_run(self): + return - # mock_worker.run.side_effect = fake_run + mock_worker.run.side_effect = fake_run - # fake_loader = DictDataLoader() - # mock_var_manager = MagicMock() - # mock_host = MagicMock() - # mock_host.has_hostkey = True - # mock_inventory = MagicMock() - # mock_options = MagicMock() - # mock_options.module_path = None + fake_loader = DictDataLoader() + mock_var_manager = MagicMock() + mock_host = MagicMock() + mock_host.has_hostkey = True + mock_inventory = MagicMock() + mock_options = MagicMock() + mock_options.module_path = None - # tqm = TaskQueueManager( - # inventory=mock_inventory, - # variable_manager=mock_var_manager, - # loader=fake_loader, - # options=mock_options, - # passwords=None, - # ) - # tqm._initialize_processes(3) - # tqm.hostvars = dict() + tqm = TaskQueueManager( + inventory=mock_inventory, + variable_manager=mock_var_manager, + loader=fake_loader, + options=mock_options, + passwords=None, + ) + tqm._initialize_processes(3) + tqm.hostvars = dict() - # try: - # strategy_base = StrategyBase(tqm=tqm) - # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) - # self.assertEqual(strategy_base._cur_worker, 1) - # self.assertEqual(strategy_base._pending_results, 1) - # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) - # self.assertEqual(strategy_base._cur_worker, 2) - # self.assertEqual(strategy_base._pending_results, 2) - # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) - # self.assertEqual(strategy_base._cur_worker, 0) - # self.assertEqual(strategy_base._pending_results, 3) - # finally: - # tqm.cleanup() + try: + strategy_base = StrategyBase(tqm=tqm) + strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + self.assertEqual(strategy_base._cur_worker, 1) + self.assertEqual(strategy_base._pending_results, 1) + strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + self.assertEqual(strategy_base._cur_worker, 2) + self.assertEqual(strategy_base._pending_results, 2) + strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + self.assertEqual(strategy_base._cur_worker, 0) + self.assertEqual(strategy_base._pending_results, 3) + finally: + tqm.cleanup() + def test_strategy_base_process_pending_results(self): mock_tqm = MagicMock() diff --git a/test/units/template/test_templar.py b/test/units/template/test_templar.py index 481dc3e8d50..2ec8f54e0c8 100644 --- a/test/units/template/test_templar.py +++ b/test/units/template/test_templar.py @@ -25,7 +25,7 @@ from ansible.compat.tests.mock import patch, MagicMock from ansible import constants as C from ansible.errors import * from ansible.plugins import filter_loader, lookup_loader, module_loader -from ansible.executor.task_queue_manager import SharedPluginLoaderObj +from ansible.plugins.strategy import SharedPluginLoaderObj from ansible.template import Templar from units.mock.loader import DictDataLoader From 5a57c66e3c14e26fa5f20f1ce9bb038c667b19e5 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Fri, 16 Sep 2016 00:14:53 -0500 Subject: [PATCH 2/2] Moving result reading to a background thread --- lib/ansible/executor/action_write_locks.py | 43 ++ lib/ansible/executor/module_common.py | 8 +- lib/ansible/executor/task_queue_manager.py | 1 + lib/ansible/playbook/conditional.py | 3 +- lib/ansible/plugins/strategy/__init__.py | 476 +++++++++--------- lib/ansible/plugins/strategy/linear.py | 8 +- .../plugins/strategies/test_strategy_base.py | 89 +++- 7 files changed, 382 insertions(+), 246 deletions(-) create mode 100644 lib/ansible/executor/action_write_locks.py diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py new file mode 100644 index 00000000000..413d56d9d72 --- /dev/null +++ b/lib/ansible/executor/action_write_locks.py @@ -0,0 +1,43 @@ +# (c) 2016 - Red Hat, Inc. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from multiprocessing import Lock +from ansible.module_utils.facts import Facts + +if 'action_write_locks' not in globals(): + # Do not initialize this more than once because it seems to bash + # the existing one. multiprocessing must be reloading the module + # when it forks? + action_write_locks = dict() + + # Below is a Lock for use when we weren't expecting a named module. + # It gets used when an action plugin directly invokes a module instead + # of going through the strategies. Slightly less efficient as all + # processes with unexpected module names will wait on this lock + action_write_locks[None] = Lock() + + # These plugins are called directly by action plugins (not going through + # a strategy). We precreate them here as an optimization + mods = set(p['name'] for p in Facts.PKG_MGRS) + mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) + for mod_name in mods: + action_write_locks[mod_name] = Lock() + diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py index d7dac13de0a..86549ea0b7a 100644 --- a/lib/ansible/executor/module_common.py +++ b/lib/ansible/executor/module_common.py @@ -36,7 +36,7 @@ from ansible.module_utils._text import to_bytes, to_text # Must import strategy and use write_locks from there # If we import write_locks directly then we end up binding a # variable to the object and then it never gets updated. -from ansible.plugins import strategy +from ansible.executor import action_write_locks try: from __main__ import display @@ -605,16 +605,16 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename) zipdata = open(cached_module_filename, 'rb').read() else: - if module_name in strategy.action_write_locks: + if module_name in action_write_locks.action_write_locks: display.debug('ANSIBALLZ: Using lock for %s' % module_name) - lock = strategy.action_write_locks[module_name] + lock = action_write_locks.action_write_locks[module_name] else: # If the action plugin directly invokes the module (instead of # going through a strategy) then we don't have a cross-process # Lock specifically for this module. Use the "unexpected # module" lock instead display.debug('ANSIBALLZ: Using generic lock for %s' % module_name) - lock = strategy.action_write_locks[None] + lock = action_write_locks.action_write_locks[None] display.debug('ANSIBALLZ: Acquiring lock') with lock: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index c0032753065..2e6948f1e0c 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -285,6 +285,7 @@ class TaskQueueManager: for host_name in iterator.get_failed_hosts(): self._failed_hosts[host_name] = True + strategy.cleanup() self._cleanup_processes() return play_return diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py index 14f50f88295..1fb54df9982 100644 --- a/lib/ansible/playbook/conditional.py +++ b/lib/ansible/playbook/conditional.py @@ -25,6 +25,7 @@ from ansible.compat.six import text_type from ansible.errors import AnsibleError, AnsibleUndefinedVariable from ansible.playbook.attribute import FieldAttribute from ansible.template import Templar +from ansible.module_utils._text import to_native class Conditional: @@ -72,7 +73,7 @@ class Conditional: if not self._check_conditional(conditional, templar, all_vars): return False except Exception as e: - raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (conditional, e), obj=ds) + raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds) return True diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index aaa164ee75b..d6db1dbde87 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -19,14 +19,18 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import os +import threading import time +from collections import deque from multiprocessing import Lock from jinja2.exceptions import UndefinedError from ansible.compat.six.moves import queue as Queue from ansible.compat.six import iteritems, string_types from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable +from ansible.executor import action_write_locks from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host @@ -50,25 +54,6 @@ except ImportError: __all__ = ['StrategyBase'] -if 'action_write_locks' not in globals(): - # Do not initialize this more than once because it seems to bash - # the existing one. multiprocessing must be reloading the module - # when it forks? - action_write_locks = dict() - - # Below is a Lock for use when we weren't expecting a named module. - # It gets used when an action plugin directly invokes a module instead - # of going through the strategies. Slightly less efficient as all - # processes with unexpected module names will wait on this lock - action_write_locks[None] = Lock() - - # These plugins are called directly by action plugins (not going through - # a strategy). We precreate them here as an optimization - mods = set(p['name'] for p in Facts.PKG_MGRS) - mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) - for mod_name in mods: - action_write_locks[mod_name] = Lock() - # TODO: this should probably be in the plugins/__init__.py, with # a smarter mechanism to set all of the attributes based on # the loaders created there @@ -86,6 +71,25 @@ class SharedPluginLoaderObj: self.module_loader = module_loader +_sentinel = object() +def results_thread_main(strategy): + #print("RESULT THREAD STARTING: %s" % threading.current_thread()) + while True: + try: + result = strategy._final_q.get() + if type(result) == object: + break + else: + #print("result in thread is: %s" % result._result) + strategy._results_lock.acquire() + strategy._results.append(result) + strategy._results_lock.release() + except (IOError, EOFError): + break + except Queue.Empty: + pass + #print("RESULT THREAD EXITED: %s" % threading.current_thread()) + class StrategyBase: ''' @@ -104,6 +108,7 @@ class StrategyBase: self._final_q = tqm._final_q self._step = getattr(tqm._options, 'step', False) self._diff = getattr(tqm._options, 'diff', False) + # Backwards compat: self._display isn't really needed, just import the global display and use that. self._display = display @@ -115,6 +120,18 @@ class StrategyBase: # outstanding tasks still in queue self._blocked_hosts = dict() + self._results = deque() + self._results_lock = threading.Condition(threading.Lock()) + + #print("creating thread for strategy %s" % id(self)) + self._results_thread = threading.Thread(target=results_thread_main, args=(self,)) + self._results_thread.daemon = True + self._results_thread.start() + + def cleanup(self): + self._final_q.put(_sentinel) + self._results_thread.join() + def run(self, iterator, play_context, result=0): # save the failed/unreachable hosts, as the run_handlers() # method will clear that information during its execution @@ -174,10 +191,9 @@ class StrategyBase: # tasks inside of play_iterator so we'd have to extract them to do it # there. - global action_write_locks - if task.action not in action_write_locks: + if task.action not in action_write_locks.action_write_locks: display.debug('Creating lock for %s' % task.action) - action_write_locks[task.action] = Lock() + action_write_locks.action_write_locks[task.action] = Lock() # and then queue the new task try: @@ -211,7 +227,7 @@ class StrategyBase: return display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) - def _process_pending_results(self, iterator, one_pass=False): + def _process_pending_results(self, iterator, one_pass=False, max_passes=None): ''' Reads results off the final queue and takes appropriate action based on the result (executing callbacks, updating state, etc.). @@ -270,228 +286,232 @@ class StrategyBase: else: return False - passes = 0 - while not self._tqm._terminated: + cur_pass = 0 + while True: try: - task_result = self._final_q.get(timeout=0.001) - original_host = get_original_host(task_result._host) - original_task = iterator.get_original_task(original_host, task_result._task) - task_result._host = original_host - task_result._task = original_task + self._results_lock.acquire() + task_result = self._results.pop() + except IndexError: + break + finally: + self._results_lock.release() - # send callbacks for 'non final' results - if '_ansible_retry' in task_result._result: - self._tqm.send_callback('v2_runner_retry', task_result) - continue - elif '_ansible_item_result' in task_result._result: - if task_result.is_failed() or task_result.is_unreachable(): - self._tqm.send_callback('v2_runner_item_on_failed', task_result) - elif task_result.is_skipped(): - self._tqm.send_callback('v2_runner_item_on_skipped', task_result) - else: - if 'diff' in task_result._result: - if self._diff: - self._tqm.send_callback('v2_on_file_diff', task_result) - self._tqm.send_callback('v2_runner_item_on_ok', task_result) - continue + original_host = get_original_host(task_result._host) + original_task = iterator.get_original_task(original_host, task_result._task) + task_result._host = original_host + task_result._task = original_task - if original_task.register: - if original_task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] - else: - host_list = [original_host] - - clean_copy = strip_internal_keys(task_result._result) - if 'invocation' in clean_copy: - del clean_copy['invocation'] - - for target_host in host_list: - self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) - - # all host status messages contain 2 entries: (msg, task_result) - role_ran = False - if task_result.is_failed(): - role_ran = True - if not original_task.ignore_errors: - display.debug("marking %s as failed" % original_host.name) - if original_task.run_once: - # if we're using run_once, we have to fail every host here - for h in self._inventory.get_hosts(iterator._play.hosts): - if h.name not in self._tqm._unreachable_hosts: - state, _ = iterator.get_next_task_for_host(h, peek=True) - iterator.mark_host_failed(h) - state, new_task = iterator.get_next_task_for_host(h, peek=True) - else: - iterator.mark_host_failed(original_host) - - # only add the host to the failed list officially if it has - # been failed by the iterator - if iterator.is_failed(original_host): - self._tqm._failed_hosts[original_host.name] = True - self._tqm._stats.increment('failures', original_host.name) - else: - # otherwise, we grab the current state and if we're iterating on - # the rescue portion of a block then we save the failed task in a - # special var for use within the rescue/always - state, _ = iterator.get_next_task_for_host(original_host, peek=True) - if state.run_state == iterator.ITERATING_RESCUE: - self._variable_manager.set_nonpersistent_facts( - original_host, - dict( - ansible_failed_task=original_task.serialize(), - ansible_failed_result=task_result._result, - ), - ) - else: - self._tqm._stats.increment('ok', original_host.name) - self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) - elif task_result.is_unreachable(): - self._tqm._unreachable_hosts[original_host.name] = True - self._tqm._stats.increment('dark', original_host.name) - self._tqm.send_callback('v2_runner_on_unreachable', task_result) + # send callbacks for 'non final' results + if '_ansible_retry' in task_result._result: + self._tqm.send_callback('v2_runner_retry', task_result) + continue + elif '_ansible_item_result' in task_result._result: + if task_result.is_failed() or task_result.is_unreachable(): + self._tqm.send_callback('v2_runner_item_on_failed', task_result) elif task_result.is_skipped(): - self._tqm._stats.increment('skipped', original_host.name) - self._tqm.send_callback('v2_runner_on_skipped', task_result) + self._tqm.send_callback('v2_runner_item_on_skipped', task_result) else: - role_ran = True - - if original_task.loop: - # this task had a loop, and has more than one result, so - # loop over all of them instead of a single result - result_items = task_result._result.get('results', []) - else: - result_items = [ task_result._result ] - - for result_item in result_items: - if '_ansible_notify' in result_item: - if task_result.is_changed(): - # The shared dictionary for notified handlers is a proxy, which - # does not detect when sub-objects within the proxy are modified. - # So, per the docs, we reassign the list so the proxy picks up and - # notifies all other threads - for handler_name in result_item['_ansible_notify']: - # Find the handler using the above helper. First we look up the - # dependency chain of the current task (if it's from a role), otherwise - # we just look through the list of handlers in the current play/all - # roles and use the first one that matches the notify name - if handler_name in self._listening_handlers: - for listening_handler_name in self._listening_handlers[handler_name]: - listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) - if listening_handler is None: - raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) - if original_host not in self._notified_handlers[listening_handler]: - self._notified_handlers[listening_handler].append(original_host) - display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) - - else: - target_handler = search_handler_blocks(handler_name, iterator._play.handlers) - if target_handler is not None: - if original_host not in self._notified_handlers[target_handler]: - self._notified_handlers[target_handler].append(original_host) - # FIXME: should this be a callback? - display.vv("NOTIFIED HANDLER %s" % (handler_name,)) - else: - # As there may be more than one handler with the notified name as the - # parent, so we just keep track of whether or not we found one at all - found = False - for target_handler in self._notified_handlers: - if parent_handler_match(target_handler, handler_name): - self._notified_handlers[target_handler].append(original_host) - display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) - found = True - - # and if none were found, then we raise an error - if not found: - raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) - - - if 'add_host' in result_item: - # this task added a new host (add_host module) - new_host_info = result_item.get('add_host', dict()) - self._add_host(new_host_info, iterator) - - elif 'add_group' in result_item: - # this task added a new group (group_by module) - self._add_group(original_host, result_item) - - elif 'ansible_facts' in result_item: - loop_var = 'item' - if original_task.loop_control: - loop_var = original_task.loop_control.loop_var or 'item' - - item = result_item.get(loop_var, None) - - if original_task.action == 'include_vars': - for (var_name, var_value) in iteritems(result_item['ansible_facts']): - # find the host we're actually refering too here, which may - # be a host that is not really in inventory at all - if original_task.delegate_to is not None and original_task.delegate_facts: - task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task) - self.add_tqm_variables(task_vars, play=iterator._play) - if item is not None: - task_vars[loop_var] = item - templar = Templar(loader=self._loader, variables=task_vars) - host_name = templar.template(original_task.delegate_to) - actual_host = self._inventory.get_host(host_name) - if actual_host is None: - actual_host = Host(name=host_name) - else: - actual_host = original_host - - if original_task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] - else: - host_list = [actual_host] - - for target_host in host_list: - self._variable_manager.set_host_variable(target_host, var_name, var_value) - else: - if original_task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] - else: - host_list = [original_host] - - for target_host in host_list: - if original_task.action == 'set_fact': - self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) - else: - self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) - if 'diff' in task_result._result: if self._diff: self._tqm.send_callback('v2_on_file_diff', task_result) + self._tqm.send_callback('v2_runner_item_on_ok', task_result) + continue - if original_task.action not in ['include', 'include_role']: - self._tqm._stats.increment('ok', original_host.name) - if 'changed' in task_result._result and task_result._result['changed']: - self._tqm._stats.increment('changed', original_host.name) + if original_task.register: + #print("^ REGISTERING RESULT %s" % original_task.register) + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] - # finally, send the ok for this task - self._tqm.send_callback('v2_runner_on_ok', task_result) + clean_copy = strip_internal_keys(task_result._result) + if 'invocation' in clean_copy: + del clean_copy['invocation'] - self._pending_results -= 1 - if original_host.name in self._blocked_hosts: - del self._blocked_hosts[original_host.name] + for target_host in host_list: + self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) - # If this is a role task, mark the parent role as being run (if - # the task was ok or failed, but not skipped or unreachable) - if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':? - # lookup the role in the ROLE_CACHE to make sure we're dealing - # with the correct object and mark it as executed - for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]): - if role_obj._uuid == original_task._role._uuid: - role_obj._had_task_run[original_host.name] = True + # all host status messages contain 2 entries: (msg, task_result) + role_ran = False + if task_result.is_failed(): + role_ran = True + if not original_task.ignore_errors: + display.debug("marking %s as failed" % original_host.name) + if original_task.run_once: + # if we're using run_once, we have to fail every host here + for h in self._inventory.get_hosts(iterator._play.hosts): + if h.name not in self._tqm._unreachable_hosts: + state, _ = iterator.get_next_task_for_host(h, peek=True) + iterator.mark_host_failed(h) + state, new_task = iterator.get_next_task_for_host(h, peek=True) + else: + iterator.mark_host_failed(original_host) - ret_results.append(task_result) + # only add the host to the failed list officially if it has + # been failed by the iterator + if iterator.is_failed(original_host): + self._tqm._failed_hosts[original_host.name] = True + self._tqm._stats.increment('failures', original_host.name) + else: + # otherwise, we grab the current state and if we're iterating on + # the rescue portion of a block then we save the failed task in a + # special var for use within the rescue/always + state, _ = iterator.get_next_task_for_host(original_host, peek=True) + if state.run_state == iterator.ITERATING_RESCUE: + self._variable_manager.set_nonpersistent_facts( + original_host, + dict( + ansible_failed_task=original_task.serialize(), + ansible_failed_result=task_result._result, + ), + ) + else: + self._tqm._stats.increment('ok', original_host.name) + self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) + elif task_result.is_unreachable(): + self._tqm._unreachable_hosts[original_host.name] = True + self._tqm._stats.increment('dark', original_host.name) + self._tqm.send_callback('v2_runner_on_unreachable', task_result) + elif task_result.is_skipped(): + self._tqm._stats.increment('skipped', original_host.name) + self._tqm.send_callback('v2_runner_on_skipped', task_result) + else: + role_ran = True - except Queue.Empty: - passes += 1 - if passes > 2: - break + if original_task.loop: + # this task had a loop, and has more than one result, so + # loop over all of them instead of a single result + result_items = task_result._result.get('results', []) + else: + result_items = [ task_result._result ] - if one_pass: + for result_item in result_items: + if '_ansible_notify' in result_item: + if task_result.is_changed(): + # The shared dictionary for notified handlers is a proxy, which + # does not detect when sub-objects within the proxy are modified. + # So, per the docs, we reassign the list so the proxy picks up and + # notifies all other threads + for handler_name in result_item['_ansible_notify']: + # Find the handler using the above helper. First we look up the + # dependency chain of the current task (if it's from a role), otherwise + # we just look through the list of handlers in the current play/all + # roles and use the first one that matches the notify name + if handler_name in self._listening_handlers: + for listening_handler_name in self._listening_handlers[handler_name]: + listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) + if listening_handler is None: + raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) + if original_host not in self._notified_handlers[listening_handler]: + self._notified_handlers[listening_handler].append(original_host) + display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) + + else: + target_handler = search_handler_blocks(handler_name, iterator._play.handlers) + if target_handler is not None: + if original_host not in self._notified_handlers[target_handler]: + self._notified_handlers[target_handler].append(original_host) + # FIXME: should this be a callback? + display.vv("NOTIFIED HANDLER %s" % (handler_name,)) + else: + # As there may be more than one handler with the notified name as the + # parent, so we just keep track of whether or not we found one at all + found = False + for target_handler in self._notified_handlers: + if parent_handler_match(target_handler, handler_name): + self._notified_handlers[target_handler].append(original_host) + display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) + found = True + + # and if none were found, then we raise an error + if not found: + raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) + + + if 'add_host' in result_item: + # this task added a new host (add_host module) + new_host_info = result_item.get('add_host', dict()) + self._add_host(new_host_info, iterator) + + elif 'add_group' in result_item: + # this task added a new group (group_by module) + self._add_group(original_host, result_item) + + elif 'ansible_facts' in result_item: + loop_var = 'item' + if original_task.loop_control: + loop_var = original_task.loop_control.loop_var or 'item' + + item = result_item.get(loop_var, None) + + if original_task.action == 'include_vars': + for (var_name, var_value) in iteritems(result_item['ansible_facts']): + # find the host we're actually refering too here, which may + # be a host that is not really in inventory at all + if original_task.delegate_to is not None and original_task.delegate_facts: + task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task) + self.add_tqm_variables(task_vars, play=iterator._play) + if item is not None: + task_vars[loop_var] = item + templar = Templar(loader=self._loader, variables=task_vars) + host_name = templar.template(original_task.delegate_to) + actual_host = self._inventory.get_host(host_name) + if actual_host is None: + actual_host = Host(name=host_name) + else: + actual_host = original_host + + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [actual_host] + + for target_host in host_list: + self._variable_manager.set_host_variable(target_host, var_name, var_value) + else: + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] + + for target_host in host_list: + if original_task.action == 'set_fact': + self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) + else: + self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) + + if 'diff' in task_result._result: + if self._diff: + self._tqm.send_callback('v2_on_file_diff', task_result) + + if original_task.action not in ['include', 'include_role']: + self._tqm._stats.increment('ok', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) + + # finally, send the ok for this task + self._tqm.send_callback('v2_runner_on_ok', task_result) + + self._pending_results -= 1 + if original_host.name in self._blocked_hosts: + del self._blocked_hosts[original_host.name] + + # If this is a role task, mark the parent role as being run (if + # the task was ok or failed, but not skipped or unreachable) + if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':? + # lookup the role in the ROLE_CACHE to make sure we're dealing + # with the correct object and mark it as executed + for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]): + if role_obj._uuid == original_task._role._uuid: + role_obj._had_task_run[original_host.name] = True + + ret_results.append(task_result) + + if one_pass or max_passes is not None and (cur_pass+1) >= max_passes: break + cur_pass += 1 + return ret_results def _wait_on_pending_results(self, iterator): diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index c6ef0a347b9..8f42ef81b57 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -263,17 +263,15 @@ class StrategyModule(StrategyBase): if run_once: break - # FIXME: probably not required here any more with the result proc - # having been removed, so there's no only a single result - # queue for the main thread - results += self._process_pending_results(iterator, one_pass=True) + results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1))) # go to next host/task group if skip_rest: continue display.debug("done queuing things up, now waiting for results queue to drain") - results += self._wait_on_pending_results(iterator) + if self._pending_results > 0: + results += self._wait_on_pending_results(iterator) host_results.extend(results) all_role_blocks = [] diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py index 49fd095bd49..faa6173eb48 100644 --- a/test/units/plugins/strategies/test_strategy_base.py +++ b/test/units/plugins/strategies/test_strategy_base.py @@ -45,16 +45,49 @@ class TestStrategyBase(unittest.TestCase): pass def test_strategy_base_init(self): + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_tqm = MagicMock(TaskQueueManager) - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._options = MagicMock() mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} strategy_base = StrategyBase(tqm=mock_tqm) + strategy_base.cleanup() def test_strategy_base_run(self): + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_tqm = MagicMock(TaskQueueManager) - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._stats = MagicMock() mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} @@ -87,8 +120,25 @@ class TestStrategyBase(unittest.TestCase): mock_tqm._unreachable_hosts = dict(host1=True) mock_iterator.get_failed_hosts.return_value = [] self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_UNREACHABLE_HOSTS) + strategy_base.cleanup() def test_strategy_base_get_hosts(self): + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_hosts = [] for i in range(0, 5): mock_host = MagicMock() @@ -100,7 +150,7 @@ class TestStrategyBase(unittest.TestCase): mock_inventory.get_hosts.return_value = mock_hosts mock_tqm = MagicMock() - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} mock_tqm.get_inventory.return_value = mock_inventory @@ -120,6 +170,7 @@ class TestStrategyBase(unittest.TestCase): mock_tqm._unreachable_hosts = ["host02"] self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:]) + strategy_base.cleanup() @patch.object(WorkerProcess, 'run') def test_strategy_base_queue_task(self, mock_worker): @@ -178,10 +229,13 @@ class TestStrategyBase(unittest.TestCase): raise Queue.Empty else: return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) mock_queue = MagicMock() mock_queue.empty.side_effect = _queue_empty mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put mock_tqm._final_q = mock_queue mock_tqm._stats = MagicMock() @@ -272,7 +326,7 @@ class TestStrategyBase(unittest.TestCase): strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 mock_iterator.is_failed.return_value = True - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) self.assertEqual(strategy_base._pending_results, 0) @@ -306,7 +360,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) @@ -314,7 +368,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo')))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) @@ -322,7 +376,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler']))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - results = strategy_base._process_pending_results(iterator=mock_iterator) + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) @@ -341,6 +395,7 @@ class TestStrategyBase(unittest.TestCase): #queue_items.append(('bad')) #self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator) + strategy_base.cleanup() def test_strategy_base_load_included_file(self): fake_loader = DictDataLoader({ @@ -351,13 +406,30 @@ class TestStrategyBase(unittest.TestCase): """, }) + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + def _queue_put(item, *args, **kwargs): + queue_items.append(item) + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_queue.put.side_effect = _queue_put + mock_tqm = MagicMock() - mock_tqm._final_q = MagicMock() + mock_tqm._final_q = mock_queue mock_tqm._notified_handlers = {} mock_tqm._listening_handlers = {} strategy_base = StrategyBase(tqm=mock_tqm) strategy_base._loader = fake_loader + strategy_base.cleanup() mock_play = MagicMock() @@ -443,4 +515,5 @@ class TestStrategyBase(unittest.TestCase): result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context) finally: + strategy_base.cleanup() tqm.cleanup()