From 5f953d1129925579b7bf6ea2cce1ccf583199801 Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 22 Aug 2018 16:07:31 +0200 Subject: [PATCH] Fix handlers on strategy free (#33011) Add _flushed_hosts dict to store when handlers are flushed and prevent them to be executed too early using _filter_notified_hosts. Add _wait_on_handler_results to wait only for handlers to be completed. Remove only hosts that have been flushed from handler notified list. Fix #31504, #23970 --- lib/ansible/plugins/strategy/__init__.py | 65 +++++++++++++++++++++--- lib/ansible/plugins/strategy/free.py | 9 ++++ 2 files changed, 66 insertions(+), 8 deletions(-) diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 480bb3ddd87..27862186ee4 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -197,6 +197,10 @@ class StrategyBase: # outstanding tasks still in queue self._blocked_hosts = dict() + # this dictionary is used to keep track of hosts that have + # flushed handlers + self._flushed_hosts = dict() + self._results = deque() self._results_lock = threading.Condition(threading.Lock()) @@ -657,6 +661,35 @@ class StrategyBase: return ret_results + def _wait_on_handler_results(self, iterator, handler, notified_hosts): + ''' + Wait for the handler tasks to complete, using a short sleep + between checks to ensure we don't spin lock + ''' + + ret_results = [] + handler_results = 0 + + display.debug("waiting for handler results...") + while (self._pending_results > 0 and + handler_results < len(notified_hosts) and + not self._tqm._terminated): + + if self._tqm.has_dead_workers(): + raise AnsibleError("A worker was found in a dead state") + + results = self._process_pending_results(iterator) + ret_results.extend(results) + handler_results += len([ + r._host for r in results if r._host in notified_hosts and + r.task_name == handler.name]) + if self._pending_results > 0: + time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) + + display.debug("no more pending handlers, returning what we have") + + return ret_results + def _wait_on_pending_results(self, iterator): ''' Wait for the shared counter to drop to zero, using a short sleep @@ -855,14 +888,17 @@ class StrategyBase: # self._tqm.send_callback('v2_playbook_on_no_hosts_remaining') # result = False # break - saved_name = handler.name - handler.name = handler_name - self._tqm.send_callback('v2_playbook_on_handler_task_start', handler) - handler.name = saved_name - if notified_hosts is None: notified_hosts = self._notified_handlers[handler._uuid] + notified_hosts = self._filter_notified_hosts(notified_hosts) + + if len(notified_hosts) > 0: + saved_name = handler.name + handler.name = handler_name + self._tqm.send_callback('v2_playbook_on_handler_task_start', handler) + handler.name = saved_name + run_once = False try: action = action_loader.get(handler.action, class_only=True) @@ -883,7 +919,7 @@ class StrategyBase: break # collect the results from the handler run - host_results = self._wait_on_pending_results(iterator) + host_results = self._wait_on_handler_results(iterator, handler, notified_hosts) try: included_files = IncludedFile.process_include_results( @@ -922,11 +958,22 @@ class StrategyBase: display.warning(str(e)) continue - # wipe the notification list - self._notified_handlers[handler._uuid] = [] + # remove hosts from notification list + self._notified_handlers[handler._uuid] = [ + h for h in self._notified_handlers[handler._uuid] + if h not in notified_hosts] display.debug("done running handlers, result is: %s" % result) return result + def _filter_notified_hosts(self, notified_hosts): + ''' + Filter notified hosts accordingly to strategy + ''' + + # As main strategy is linear, we do not filter hosts + # We return a copy to avoid race conditions + return notified_hosts[:] + def _take_step(self, task, host=None): ret = False @@ -974,7 +1021,9 @@ class StrategyBase: elif meta_action == 'flush_handlers': if task.when: self._cond_not_supported_warn(meta_action) + self._flushed_hosts[target_host] = True self.run_handlers(iterator, play_context) + self._flushed_hosts[target_host] = False msg = "ran handlers" elif meta_action == 'refresh_inventory' or self.flush_cache: if task.when: diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py index 7d2bfa54e83..d4b61376dab 100644 --- a/lib/ansible/plugins/strategy/free.py +++ b/lib/ansible/plugins/strategy/free.py @@ -49,6 +49,15 @@ except ImportError: class StrategyModule(StrategyBase): + def _filter_notified_hosts(self, notified_hosts): + ''' + Filter notified hosts accordingly to strategy + ''' + + # We act only on hosts that are ready to flush handlers + return [host for host in notified_hosts + if host in self._flushed_hosts and self._flushed_hosts[host]] + def run(self, iterator, play_context): ''' The "free" strategy is a bit more complex, in that it allows tasks to