From 78f34786dd468c42d7a222468685590207e74679 Mon Sep 17 00:00:00 2001 From: Matt Martz Date: Thu, 18 Mar 2021 14:12:29 -0500 Subject: [PATCH] Send callbacks directly from the TaskExecutor instead of TaskResults masquerading as callbacks (#73927) --- .../fragments/73899-more-te-callbacks.yml | 5 ++ lib/ansible/executor/task_executor.py | 25 +++++-- lib/ansible/plugins/strategy/__init__.py | 66 +++++++++---------- test/units/plugins/strategy/test_strategy.py | 32 ++++----- 4 files changed, 74 insertions(+), 54 deletions(-) create mode 100644 changelogs/fragments/73899-more-te-callbacks.yml diff --git a/changelogs/fragments/73899-more-te-callbacks.yml b/changelogs/fragments/73899-more-te-callbacks.yml new file mode 100644 index 00000000000..49803259805 --- /dev/null +++ b/changelogs/fragments/73899-more-te-callbacks.yml @@ -0,0 +1,5 @@ +minor_changes: +- Callbacks - Migrate more places in the ``TaskExecutor`` to sending callbacks directly + over the queue, instead of sending them as ``TaskResult`` and short circuiting in the + Strategy to send the callback. This enables closer to real time callbacks of retries + and loop results (https://github.com/ansible/ansible/issues/73899) diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index 55db1830162..876e8c834b7 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -382,12 +382,21 @@ class TaskExecutor: 'msg': 'Failed to template loop_control.label: %s' % to_text(e) }) - self._final_q.send_task_result( + tr = TaskResult( self._host.name, self._task._uuid, res, task_fields=task_fields, ) + if tr.is_failed() or tr.is_unreachable(): + self._final_q.send_callback('v2_runner_item_on_failed', tr) + elif tr.is_skipped(): + self._final_q.send_callback('v2_runner_item_on_skipped', tr) + else: + if getattr(self._task, 'diff', False): + self._final_q.send_callback('v2_on_file_diff', tr) + self._final_q.send_callback('v2_runner_item_on_ok', tr) + results.append(res) del task_vars[loop_var] @@ -673,7 +682,15 @@ class TaskExecutor: result['_ansible_retry'] = True result['retries'] = retries display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) - self._final_q.send_task_result(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()) + self._final_q.send_callback( + 'v2_runner_retry', + TaskResult( + self._host.name, + self._task._uuid, + result, + task_fields=self._task.dump_attrs() + ) + ) time.sleep(delay) self._handler = self._get_action_handler(connection=self._connection, templar=templar) else: @@ -782,8 +799,8 @@ class TaskExecutor: self._final_q.send_callback( 'v2_runner_on_async_poll', TaskResult( - self._host, - async_task, + self._host.name, + async_task, # We send the full task here, because the controller knows nothing about it, the TE created it async_result, task_fields=self._task.dump_attrs(), ), diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index a1377f88633..70dea1ea6f3 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -94,8 +94,13 @@ def results_thread_main(strategy): if isinstance(result, StrategySentinel): break elif isinstance(result, CallbackSend): + for arg in result.args: + if isinstance(arg, TaskResult): + strategy.normalize_task_result(arg) + break strategy._tqm.send_callback(result.method_name, *result.args, **result.kwargs) elif isinstance(result, TaskResult): + strategy.normalize_task_result(result) with strategy._results_lock: # only handlers have the listen attr, so this must be a handler # we split up the results into two queues here to make sure @@ -446,6 +451,31 @@ class StrategyBase: for target_host in host_list: _set_host_facts(target_host, always_facts) + def normalize_task_result(self, task_result): + """Normalize a TaskResult to reference actual Host and Task objects + when only given the ``Host.name``, or the ``Task._uuid`` + + Only the ``Host.name`` and ``Task._uuid`` are commonly sent back from + the ``TaskExecutor`` or ``WorkerProcess`` due to performance concerns + + Mutates the original object + """ + + if isinstance(task_result._host, string_types): + # If the value is a string, it is ``Host.name`` + task_result._host = self._inventory.get_host(to_text(task_result._host)) + + if isinstance(task_result._task, string_types): + # If the value is a string, it is ``Task._uuid`` + queue_cache_entry = (task_result._host.name, task_result._task) + found_task = self._queued_task_cache.get(queue_cache_entry)['task'] + original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) + original_task._parent = found_task._parent + original_task.from_attrs(task_result._task_fields) + task_result._task = original_task + + return task_result + @debug_closure def _process_pending_results(self, iterator, one_pass=False, max_passes=None, do_handlers=False): ''' @@ -456,14 +486,6 @@ class StrategyBase: ret_results = [] handler_templar = Templar(self._loader) - def get_original_host(host_name): - # FIXME: this should not need x2 _inventory - host_name = to_text(host_name) - if host_name in self._inventory.hosts: - return self._inventory.hosts[host_name] - else: - return self._inventory.get_host(host_name) - def search_handler_blocks_by_name(handler_name, handler_blocks): # iterate in reversed order since last handler loaded with the same name wins for handler_block in reversed(handler_blocks): @@ -512,32 +534,8 @@ class StrategyBase: finally: self._results_lock.release() - # get the original host and task. We then assign them to the TaskResult for use in callbacks/etc. - original_host = get_original_host(task_result._host) - queue_cache_entry = (original_host.name, task_result._task) - found_task = self._queued_task_cache.get(queue_cache_entry)['task'] - original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) - original_task._parent = found_task._parent - original_task.from_attrs(task_result._task_fields) - - task_result._host = original_host - task_result._task = original_task - - # send callbacks for 'non final' results - if '_ansible_retry' in task_result._result: - self._tqm.send_callback('v2_runner_retry', task_result) - continue - elif '_ansible_item_result' in task_result._result: - if task_result.is_failed() or task_result.is_unreachable(): - self._tqm.send_callback('v2_runner_item_on_failed', task_result) - elif task_result.is_skipped(): - self._tqm.send_callback('v2_runner_item_on_skipped', task_result) - else: - if 'diff' in task_result._result: - if self._diff or getattr(original_task, 'diff', False): - self._tqm.send_callback('v2_on_file_diff', task_result) - self._tqm.send_callback('v2_runner_item_on_ok', task_result) - continue + original_host = task_result._host + original_task = task_result._task # all host status messages contain 2 entries: (msg, task_result) role_ran = False diff --git a/test/units/plugins/strategy/test_strategy.py b/test/units/plugins/strategy/test_strategy.py index 9a2574d279e..bfb694f2fba 100644 --- a/test/units/plugins/strategy/test_strategy.py +++ b/test/units/plugins/strategy/test_strategy.py @@ -20,7 +20,6 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type from units.mock.loader import DictDataLoader -from copy import deepcopy import uuid from units.compat import unittest @@ -254,7 +253,7 @@ class TestStrategyBase(unittest.TestCase): mock_task._parent = None mock_task.ignore_errors = False mock_task.ignore_unreachable = False - mock_task._uuid = uuid.uuid4() + mock_task._uuid = str(uuid.uuid4()) mock_task.loop = None mock_task.copy.return_value = mock_task @@ -319,16 +318,17 @@ class TestStrategyBase(unittest.TestCase): strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - mock_queued_task_cache = { - (mock_host.name, mock_task._uuid): { - 'task': mock_task, - 'host': mock_host, - 'task_vars': {}, - 'play_context': {}, + def mock_queued_task_cache(): + return { + (mock_host.name, mock_task._uuid): { + 'task': mock_task, + 'host': mock_host, + 'task_vars': {}, + 'play_context': {}, + } } - } - strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) + strategy_base._queued_task_cache = mock_queued_task_cache() results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -340,7 +340,7 @@ class TestStrategyBase(unittest.TestCase): strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 mock_iterator.is_failed.return_value = True - strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) + strategy_base._queued_task_cache = mock_queued_task_cache() results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -354,7 +354,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) + strategy_base._queued_task_cache = mock_queued_task_cache() results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -367,7 +367,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) + strategy_base._queued_task_cache = mock_queued_task_cache() results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -377,7 +377,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) + strategy_base._queued_task_cache = mock_queued_task_cache() results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) @@ -386,7 +386,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo')))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) + strategy_base._queued_task_cache = mock_queued_task_cache() results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) @@ -395,7 +395,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler']))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) + strategy_base._queued_task_cache = mock_queued_task_cache() results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0)