Template "original_task" fields in _process_pending_results
Since we no longer use a post-validated task in _process_pending_results, we
need to be sure to template fields used in original_task as they are raw and
may contain variables.
This patch also moves the handler tracking to be per-uuid, not per-object.
Doing it per-object had implications for the above due to the fact that the
copy of the original task is now being used, so the only sure way is to track
based on the uuid instead.
Fixes #18289
(cherry picked from commit dd0257b995
)
This commit is contained in:
parent
e1b459470d
commit
2d8ebbfe8c
3 changed files with 70 additions and 52 deletions
|
@ -138,8 +138,8 @@ class TaskQueueManager:
|
|||
|
||||
# then initialize it with the given handler list
|
||||
for handler in handler_list:
|
||||
if handler not in self._notified_handlers:
|
||||
self._notified_handlers[handler] = []
|
||||
if handler._uuid not in self._notified_handlers:
|
||||
self._notified_handlers[handler._uuid] = []
|
||||
if handler.listen:
|
||||
listeners = handler.listen
|
||||
if not isinstance(listeners, list):
|
||||
|
@ -147,14 +147,7 @@ class TaskQueueManager:
|
|||
for listener in listeners:
|
||||
if listener not in self._listening_handlers:
|
||||
self._listening_handlers[listener] = []
|
||||
|
||||
# if the handler has a name, we append it to the list of listening
|
||||
# handlers, otherwise we use the uuid to avoid trampling on other
|
||||
# nameless listeners
|
||||
if handler.name:
|
||||
self._listening_handlers[listener].append(handler.get_name())
|
||||
else:
|
||||
self._listening_handlers[listener].append(handler._uuid)
|
||||
self._listening_handlers[listener].append(handler._uuid)
|
||||
|
||||
def load_callbacks(self):
|
||||
'''
|
||||
|
|
|
@ -244,7 +244,7 @@ class StrategyBase:
|
|||
else:
|
||||
return self._inventory.get_host(host_name)
|
||||
|
||||
def search_handler_blocks(handler_name, handler_blocks):
|
||||
def search_handler_blocks_by_name(handler_name, handler_blocks):
|
||||
for handler_block in handler_blocks:
|
||||
for handler_task in handler_block.block:
|
||||
if handler_task.name:
|
||||
|
@ -268,11 +268,14 @@ class StrategyBase:
|
|||
# set_fact or some other method, and we don't want to error
|
||||
# out unnecessarily
|
||||
continue
|
||||
else:
|
||||
# if the handler name is not set, we check via the handlers uuid.
|
||||
# this is mainly used by listening handlers only
|
||||
if handler_name == handler_task._uuid:
|
||||
return handler_task
|
||||
return None
|
||||
|
||||
|
||||
def search_handler_blocks_by_uuid(handler_uuid, handler_blocks):
|
||||
for handler_block in handler_blocks:
|
||||
for handler_task in handler_block.block:
|
||||
if handler_uuid == handler_task._uuid:
|
||||
return handler_task
|
||||
return None
|
||||
|
||||
def parent_handler_match(target_handler, handler_name):
|
||||
|
@ -294,6 +297,11 @@ class StrategyBase:
|
|||
else:
|
||||
return False
|
||||
|
||||
# a Templar class to use for templating things later, as we're using
|
||||
# original/non-validated objects here on the manager side. We set the
|
||||
# variables in use later inside the loop below
|
||||
templar = Templar(loader=self._loader)
|
||||
|
||||
cur_pass = 0
|
||||
while True:
|
||||
try:
|
||||
|
@ -304,11 +312,24 @@ class StrategyBase:
|
|||
finally:
|
||||
self._results_lock.release()
|
||||
|
||||
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
|
||||
original_host = get_original_host(task_result._host)
|
||||
original_task = iterator.get_original_task(original_host, task_result._task)
|
||||
|
||||
task_result._host = original_host
|
||||
task_result._task = original_task
|
||||
|
||||
# get the correct loop var for use later
|
||||
if original_task.loop_control:
|
||||
loop_var = original_task.loop_control.loop_var or 'item'
|
||||
else:
|
||||
loop_var = 'item'
|
||||
|
||||
# get the vars for this task/host pair, make them the active set of vars for our templar above
|
||||
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=original_host, task=original_task)
|
||||
self.add_tqm_variables(task_vars, play=iterator._play)
|
||||
templar.set_available_variables(task_vars)
|
||||
|
||||
# send callbacks for 'non final' results
|
||||
if '_ansible_retry' in task_result._result:
|
||||
self._tqm.send_callback('v2_runner_retry', task_result)
|
||||
|
@ -325,8 +346,9 @@ class StrategyBase:
|
|||
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
|
||||
continue
|
||||
|
||||
run_once = templar.template(original_task.run_once)
|
||||
if original_task.register:
|
||||
if original_task.run_once:
|
||||
if run_once:
|
||||
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
|
||||
else:
|
||||
host_list = [original_host]
|
||||
|
@ -342,9 +364,10 @@ class StrategyBase:
|
|||
role_ran = False
|
||||
if task_result.is_failed():
|
||||
role_ran = True
|
||||
if not original_task.ignore_errors:
|
||||
ignore_errors = templar.template(original_task.ignore_errors)
|
||||
if not ignore_errors:
|
||||
display.debug("marking %s as failed" % original_host.name)
|
||||
if original_task.run_once:
|
||||
if run_once:
|
||||
# if we're using run_once, we have to fail every host here
|
||||
for h in self._inventory.get_hosts(iterator._play.hosts):
|
||||
if h.name not in self._tqm._unreachable_hosts:
|
||||
|
@ -377,7 +400,7 @@ class StrategyBase:
|
|||
self._tqm._stats.increment('ok', original_host.name)
|
||||
if 'changed' in task_result._result and task_result._result['changed']:
|
||||
self._tqm._stats.increment('changed', original_host.name)
|
||||
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
|
||||
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors)
|
||||
elif task_result.is_unreachable():
|
||||
self._tqm._unreachable_hosts[original_host.name] = True
|
||||
iterator._play._removed_hosts.append(original_host.name)
|
||||
|
@ -398,43 +421,46 @@ class StrategyBase:
|
|||
|
||||
for result_item in result_items:
|
||||
if '_ansible_notify' in result_item:
|
||||
print("GOT A NOTIFY")
|
||||
if task_result.is_changed():
|
||||
# The shared dictionary for notified handlers is a proxy, which
|
||||
# does not detect when sub-objects within the proxy are modified.
|
||||
# So, per the docs, we reassign the list so the proxy picks up and
|
||||
# notifies all other threads
|
||||
for handler_name in result_item['_ansible_notify']:
|
||||
print("TRYING TO SEND NOTIFICATION TO HANDLER: %s" % handler_name)
|
||||
found = False
|
||||
# Find the handler using the above helper. First we look up the
|
||||
# dependency chain of the current task (if it's from a role), otherwise
|
||||
# we just look through the list of handlers in the current play/all
|
||||
# roles and use the first one that matches the notify name
|
||||
target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
|
||||
target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers)
|
||||
if target_handler is not None:
|
||||
found = True
|
||||
if original_host not in self._notified_handlers[target_handler]:
|
||||
self._notified_handlers[target_handler].append(original_host)
|
||||
if original_host._uuid not in self._notified_handlers[target_handler._uuid]:
|
||||
self._notified_handlers[target_handler._uuid].append(original_host)
|
||||
# FIXME: should this be a callback?
|
||||
display.vv("NOTIFIED HANDLER %s" % (handler_name,))
|
||||
else:
|
||||
# As there may be more than one handler with the notified name as the
|
||||
# parent, so we just keep track of whether or not we found one at all
|
||||
for target_handler in self._notified_handlers:
|
||||
if parent_handler_match(target_handler, handler_name):
|
||||
self._notified_handlers[target_handler].append(original_host)
|
||||
for target_handler_uuid in self._notified_handlers:
|
||||
target_handler = search_handler_blocks_by_uuid(target_handler_uuid, iterator._play.handlers)
|
||||
if target_handler and parent_handler_match(target_handler, handler_name):
|
||||
self._notified_handlers[target_handler._uuid].append(original_host)
|
||||
display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),))
|
||||
found = True
|
||||
|
||||
if handler_name in self._listening_handlers:
|
||||
for listening_handler_name in self._listening_handlers[handler_name]:
|
||||
listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
|
||||
for listening_handler_uuid in self._listening_handlers[handler_name]:
|
||||
listening_handler = search_handler_blocks_by_uuid(listening_handler_uuid, iterator._play.handlers)
|
||||
if listening_handler is not None:
|
||||
found = True
|
||||
else:
|
||||
continue
|
||||
if original_host not in self._notified_handlers[listening_handler]:
|
||||
self._notified_handlers[listening_handler].append(original_host)
|
||||
display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
|
||||
if original_host not in self._notified_handlers[listening_handler._uuid]:
|
||||
self._notified_handlers[listening_handler._uuid].append(original_host)
|
||||
display.vv("NOTIFIED HANDLER %s" % (listening_handler.get_name(),))
|
||||
|
||||
# and if none were found, then we raise an error
|
||||
if not found:
|
||||
|
@ -455,21 +481,11 @@ class StrategyBase:
|
|||
|
||||
elif 'ansible_facts' in result_item:
|
||||
|
||||
# set correct loop var
|
||||
if original_task.loop_control:
|
||||
loop_var = original_task.loop_control.loop_var or 'item'
|
||||
else:
|
||||
loop_var = 'item'
|
||||
|
||||
item = result_item.get(loop_var, None)
|
||||
|
||||
# if delegated fact and we are delegating facts, we need to change target host for them
|
||||
if original_task.delegate_to is not None and original_task.delegate_facts:
|
||||
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=original_host, task=original_task)
|
||||
self.add_tqm_variables(task_vars, play=iterator._play)
|
||||
item = result_item.get(loop_var, None)
|
||||
if item is not None:
|
||||
task_vars[loop_var] = item
|
||||
templar = Templar(loader=self._loader, variables=task_vars)
|
||||
host_name = templar.template(original_task.delegate_to)
|
||||
actual_host = self._inventory.get_host(host_name)
|
||||
if actual_host is None:
|
||||
|
@ -482,7 +498,7 @@ class StrategyBase:
|
|||
# find the host we're actually refering too here, which may
|
||||
# be a host that is not really in inventory at all
|
||||
|
||||
if original_task.run_once:
|
||||
if run_once:
|
||||
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
|
||||
else:
|
||||
host_list = [actual_host]
|
||||
|
@ -490,7 +506,7 @@ class StrategyBase:
|
|||
for target_host in host_list:
|
||||
self._variable_manager.set_host_variable(target_host, var_name, var_value)
|
||||
else:
|
||||
if original_task.run_once:
|
||||
if run_once:
|
||||
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
|
||||
else:
|
||||
host_list = [actual_host]
|
||||
|
@ -719,7 +735,7 @@ class StrategyBase:
|
|||
# but this may take some work in the iterator and gets tricky when
|
||||
# we consider the ability of meta tasks to flush handlers
|
||||
for handler in handler_block.block:
|
||||
if handler in self._notified_handlers and len(self._notified_handlers[handler]):
|
||||
if handler._uuid in self._notified_handlers and len(self._notified_handlers[handler._uuid]):
|
||||
result = self._do_handler_run(handler, handler.get_name(), iterator=iterator, play_context=play_context)
|
||||
if not result:
|
||||
break
|
||||
|
@ -738,7 +754,7 @@ class StrategyBase:
|
|||
handler.name = saved_name
|
||||
|
||||
if notified_hosts is None:
|
||||
notified_hosts = self._notified_handlers[handler]
|
||||
notified_hosts = self._notified_handlers[handler._uuid]
|
||||
|
||||
run_once = False
|
||||
try:
|
||||
|
@ -802,7 +818,7 @@ class StrategyBase:
|
|||
continue
|
||||
|
||||
# wipe the notification list
|
||||
self._notified_handlers[handler] = []
|
||||
self._notified_handlers[handler._uuid] = []
|
||||
display.debug("done running handlers, result is: %s" % result)
|
||||
return result
|
||||
|
||||
|
|
|
@ -250,15 +250,20 @@ class TestStrategyBase(unittest.TestCase):
|
|||
|
||||
mock_task = MagicMock()
|
||||
mock_task._role = None
|
||||
mock_task._parent = None
|
||||
mock_task.ignore_errors = False
|
||||
mock_task._uuid = uuid.uuid4()
|
||||
mock_task.loop = None
|
||||
mock_task.copy.return_value = mock_task
|
||||
|
||||
mock_handler_task = MagicMock(Handler)
|
||||
mock_handler_task.name = 'test handler'
|
||||
mock_handler_task.action = 'foo'
|
||||
mock_handler_task._parent = None
|
||||
mock_handler_task.get_name.return_value = "test handler"
|
||||
mock_handler_task.has_triggered.return_value = False
|
||||
mock_handler_task._uuid = 'xxxxxxxxxxxxx'
|
||||
mock_handler_task.copy.return_value = mock_handler_task
|
||||
|
||||
mock_iterator = MagicMock()
|
||||
mock_iterator._play = mock_play
|
||||
|
@ -272,7 +277,7 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_handler_block.always = []
|
||||
mock_play.handlers = [mock_handler_block]
|
||||
|
||||
mock_tqm._notified_handlers = {mock_handler_task: []}
|
||||
mock_tqm._notified_handlers = {mock_handler_task._uuid: []}
|
||||
mock_tqm._listening_handlers = {}
|
||||
|
||||
mock_group = MagicMock()
|
||||
|
@ -298,6 +303,7 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_var_mgr = MagicMock()
|
||||
mock_var_mgr.set_host_variable.return_value = None
|
||||
mock_var_mgr.set_host_facts.return_value = None
|
||||
mock_var_mgr.get_vars.return_value = dict()
|
||||
|
||||
strategy_base = StrategyBase(tqm=mock_tqm)
|
||||
strategy_base._inventory = mock_inventory
|
||||
|
@ -307,7 +313,7 @@ class TestStrategyBase(unittest.TestCase):
|
|||
def _has_dead_workers():
|
||||
return False
|
||||
|
||||
strategy_base._tqm.has_dead_workers = _has_dead_workers
|
||||
strategy_base._tqm.has_dead_workers.side_effect = _has_dead_workers
|
||||
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
|
||||
self.assertEqual(len(results), 0)
|
||||
|
||||
|
@ -380,8 +386,8 @@ class TestStrategyBase(unittest.TestCase):
|
|||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(strategy_base._pending_results, 0)
|
||||
self.assertNotIn('test01', strategy_base._blocked_hosts)
|
||||
self.assertIn(mock_handler_task, strategy_base._notified_handlers)
|
||||
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task])
|
||||
self.assertIn(mock_handler_task._uuid, strategy_base._notified_handlers)
|
||||
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task._uuid])
|
||||
|
||||
#queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
|
||||
#results = strategy_base._process_pending_results(iterator=mock_iterator)
|
||||
|
@ -440,6 +446,7 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_task = MagicMock()
|
||||
mock_task._block = mock_block
|
||||
mock_task._role = None
|
||||
mock_task._parent = None
|
||||
|
||||
mock_iterator = MagicMock()
|
||||
mock_iterator.mark_host_failed.return_value = None
|
||||
|
@ -467,6 +474,8 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_handler_task.has_triggered.return_value = False
|
||||
mock_handler_task.listen = None
|
||||
mock_handler_task._role = None
|
||||
mock_handler_task._parent = None
|
||||
mock_handler_task._uuid = 'xxxxxxxxxxxxxxxx'
|
||||
|
||||
mock_handler = MagicMock()
|
||||
mock_handler.block = [mock_handler_task]
|
||||
|
@ -508,7 +517,7 @@ class TestStrategyBase(unittest.TestCase):
|
|||
strategy_base = StrategyBase(tqm=tqm)
|
||||
|
||||
strategy_base._inventory = mock_inventory
|
||||
strategy_base._notified_handlers = {mock_handler_task: [mock_host]}
|
||||
strategy_base._notified_handlers = {mock_handler_task._uuid: [mock_host]}
|
||||
|
||||
task_result = TaskResult(Host('host01'), Handler(), dict(changed=False))
|
||||
tqm._final_q.put(task_result)
|
||||
|
|
Loading…
Reference in a new issue