Fix handlers on strategy free (#33011)

Add _flushed_hosts dict to store when handlers are flushed and prevent them
to be executed too early using _filter_notified_hosts.

Add _wait_on_handler_results to wait only for handlers to be completed.

Remove only hosts that have been flushed from handler notified list.

Fix #31504, #23970
This commit is contained in:
Pablo 2018-08-22 16:07:31 +02:00 committed by Brian Coca
parent ec2aa12581
commit 5f953d1129
2 changed files with 66 additions and 8 deletions

View file

@ -197,6 +197,10 @@ class StrategyBase:
# outstanding tasks still in queue
self._blocked_hosts = dict()
# this dictionary is used to keep track of hosts that have
# flushed handlers
self._flushed_hosts = dict()
self._results = deque()
self._results_lock = threading.Condition(threading.Lock())
@ -657,6 +661,35 @@ class StrategyBase:
return ret_results
def _wait_on_handler_results(self, iterator, handler, notified_hosts):
'''
Wait for the handler tasks to complete, using a short sleep
between checks to ensure we don't spin lock
'''
ret_results = []
handler_results = 0
display.debug("waiting for handler results...")
while (self._pending_results > 0 and
handler_results < len(notified_hosts) and
not self._tqm._terminated):
if self._tqm.has_dead_workers():
raise AnsibleError("A worker was found in a dead state")
results = self._process_pending_results(iterator)
ret_results.extend(results)
handler_results += len([
r._host for r in results if r._host in notified_hosts and
r.task_name == handler.name])
if self._pending_results > 0:
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
display.debug("no more pending handlers, returning what we have")
return ret_results
def _wait_on_pending_results(self, iterator):
'''
Wait for the shared counter to drop to zero, using a short sleep
@ -855,14 +888,17 @@ class StrategyBase:
# self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
# result = False
# break
saved_name = handler.name
handler.name = handler_name
self._tqm.send_callback('v2_playbook_on_handler_task_start', handler)
handler.name = saved_name
if notified_hosts is None:
notified_hosts = self._notified_handlers[handler._uuid]
notified_hosts = self._filter_notified_hosts(notified_hosts)
if len(notified_hosts) > 0:
saved_name = handler.name
handler.name = handler_name
self._tqm.send_callback('v2_playbook_on_handler_task_start', handler)
handler.name = saved_name
run_once = False
try:
action = action_loader.get(handler.action, class_only=True)
@ -883,7 +919,7 @@ class StrategyBase:
break
# collect the results from the handler run
host_results = self._wait_on_pending_results(iterator)
host_results = self._wait_on_handler_results(iterator, handler, notified_hosts)
try:
included_files = IncludedFile.process_include_results(
@ -922,11 +958,22 @@ class StrategyBase:
display.warning(str(e))
continue
# wipe the notification list
self._notified_handlers[handler._uuid] = []
# remove hosts from notification list
self._notified_handlers[handler._uuid] = [
h for h in self._notified_handlers[handler._uuid]
if h not in notified_hosts]
display.debug("done running handlers, result is: %s" % result)
return result
def _filter_notified_hosts(self, notified_hosts):
'''
Filter notified hosts accordingly to strategy
'''
# As main strategy is linear, we do not filter hosts
# We return a copy to avoid race conditions
return notified_hosts[:]
def _take_step(self, task, host=None):
ret = False
@ -974,7 +1021,9 @@ class StrategyBase:
elif meta_action == 'flush_handlers':
if task.when:
self._cond_not_supported_warn(meta_action)
self._flushed_hosts[target_host] = True
self.run_handlers(iterator, play_context)
self._flushed_hosts[target_host] = False
msg = "ran handlers"
elif meta_action == 'refresh_inventory' or self.flush_cache:
if task.when:

View file

@ -49,6 +49,15 @@ except ImportError:
class StrategyModule(StrategyBase):
def _filter_notified_hosts(self, notified_hosts):
'''
Filter notified hosts accordingly to strategy
'''
# We act only on hosts that are ready to flush handlers
return [host for host in notified_hosts
if host in self._flushed_hosts and self._flushed_hosts[host]]
def run(self, iterator, play_context):
'''
The "free" strategy is a bit more complex, in that it allows tasks to