Consolidate handler tracking (#49338)
* Consolidate handler tracking - Remove unused code. ci_complete - unit test fixes. ci_complete - Restore previous behavior of matching a single handler - when notifying a host for a handler, return True if it was added, False otherwise, to reduce copied logic - rename funcitons for clarity. ci_complete - Remove handler logic for static includes which was disabled previously
This commit is contained in:
parent
62b2a08cfb
commit
2a469fd959
7 changed files with 41 additions and 133 deletions
|
@ -90,10 +90,6 @@ class TaskQueueManager:
|
|||
# a special flag to help us exit cleanly
|
||||
self._terminated = False
|
||||
|
||||
# this dictionary is used to keep track of notified handlers
|
||||
self._notified_handlers = dict()
|
||||
self._listening_handlers = dict()
|
||||
|
||||
# dictionaries to keep track of failed/unreachable hosts
|
||||
self._failed_hosts = dict()
|
||||
self._unreachable_hosts = dict()
|
||||
|
@ -120,11 +116,6 @@ class TaskQueueManager:
|
|||
inventory hostnames for those hosts triggering the handler.
|
||||
'''
|
||||
|
||||
# Zero the dictionary first by removing any entries there.
|
||||
# Proxied dicts don't support iteritems, so we have to use keys()
|
||||
self._notified_handlers.clear()
|
||||
self._listening_handlers.clear()
|
||||
|
||||
def _process_block(b):
|
||||
temp_list = []
|
||||
for t in b.block:
|
||||
|
@ -137,23 +128,6 @@ class TaskQueueManager:
|
|||
handler_list = []
|
||||
for handler_block in play.handlers:
|
||||
handler_list.extend(_process_block(handler_block))
|
||||
# then initialize it with the given handler list
|
||||
self.update_handler_list(handler_list)
|
||||
|
||||
def update_handler_list(self, handler_list):
|
||||
for handler in handler_list:
|
||||
if handler._uuid not in self._notified_handlers:
|
||||
display.debug("Adding handler %s to notified list" % handler.name)
|
||||
self._notified_handlers[handler._uuid] = []
|
||||
if handler.listen:
|
||||
listeners = handler.listen
|
||||
if not isinstance(listeners, list):
|
||||
listeners = [listeners]
|
||||
for listener in listeners:
|
||||
if listener not in self._listening_handlers:
|
||||
self._listening_handlers[listener] = []
|
||||
display.debug("Adding handler %s to listening list" % handler.name)
|
||||
self._listening_handlers[listener].append(handler._uuid)
|
||||
|
||||
def load_callbacks(self):
|
||||
'''
|
||||
|
|
|
@ -25,10 +25,12 @@ from ansible.playbook.task import Task
|
|||
|
||||
class Handler(Task):
|
||||
|
||||
_listen = FieldAttribute(isa='list')
|
||||
_listen = FieldAttribute(isa='list', default=list)
|
||||
|
||||
def __init__(self, block=None, role=None, task_include=None):
|
||||
self._flagged_hosts = []
|
||||
self.notified_hosts = []
|
||||
|
||||
self.cached_name = False
|
||||
|
||||
super(Handler, self).__init__(block=block, role=role, task_include=task_include)
|
||||
|
||||
|
@ -41,13 +43,14 @@ class Handler(Task):
|
|||
t = Handler(block=block, role=role, task_include=task_include)
|
||||
return t.load_data(data, variable_manager=variable_manager, loader=loader)
|
||||
|
||||
def flag_for_host(self, host):
|
||||
# assert instanceof(host, Host)
|
||||
if host not in self._flagged_hosts:
|
||||
self._flagged_hosts.append(host)
|
||||
def notify_host(self, host):
|
||||
if not self.is_host_notified(host):
|
||||
self.notified_hosts.append(host)
|
||||
return True
|
||||
return False
|
||||
|
||||
def has_triggered(self, host):
|
||||
return host in self._flagged_hosts
|
||||
def is_host_notified(self, host):
|
||||
return host in self.notified_hosts
|
||||
|
||||
def serialize(self):
|
||||
result = super(Handler, self).serialize()
|
||||
|
|
|
@ -44,7 +44,6 @@ from ansible.module_utils.connection import Connection, ConnectionError
|
|||
from ansible.playbook.helpers import load_list_of_blocks
|
||||
from ansible.playbook.included_file import IncludedFile
|
||||
from ansible.playbook.task_include import TaskInclude
|
||||
from ansible.playbook.role_include import IncludeRole
|
||||
from ansible.plugins.loader import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
|
||||
from ansible.template import Templar
|
||||
from ansible.utils.display import Display
|
||||
|
@ -168,8 +167,6 @@ class StrategyBase:
|
|||
self._tqm = tqm
|
||||
self._inventory = tqm.get_inventory()
|
||||
self._workers = tqm.get_workers()
|
||||
self._notified_handlers = tqm._notified_handlers
|
||||
self._listening_handlers = tqm._listening_handlers
|
||||
self._variable_manager = tqm.get_variable_manager()
|
||||
self._loader = tqm.get_loader()
|
||||
self._final_q = tqm._final_q
|
||||
|
@ -372,19 +369,21 @@ class StrategyBase:
|
|||
for handler_block in reversed(handler_blocks):
|
||||
for handler_task in handler_block.block:
|
||||
if handler_task.name:
|
||||
handler_vars = self._variable_manager.get_vars(play=iterator._play, task=handler_task)
|
||||
templar = Templar(loader=self._loader, variables=handler_vars)
|
||||
if not handler_task.cached_name:
|
||||
handler_vars = self._variable_manager.get_vars(play=iterator._play, task=handler_task)
|
||||
templar = Templar(loader=self._loader, variables=handler_vars)
|
||||
handler_task.name = templar.template(handler_task.name)
|
||||
handler_task.cached_name = True
|
||||
|
||||
try:
|
||||
# first we check with the full result of get_name(), which may
|
||||
# include the role name (if the handler is from a role). If that
|
||||
# is not found, we resort to the simple name field, which doesn't
|
||||
# have anything extra added to it.
|
||||
target_handler_name = templar.template(handler_task.name)
|
||||
if target_handler_name == handler_name:
|
||||
if handler_task.name == handler_name:
|
||||
return handler_task
|
||||
else:
|
||||
target_handler_name = templar.template(handler_task.get_name())
|
||||
if target_handler_name == handler_name:
|
||||
if handler_task.get_name() == handler_name:
|
||||
return handler_task
|
||||
except (UndefinedError, AnsibleUndefinedVariable):
|
||||
# We skip this handler due to the fact that it may be using
|
||||
|
@ -394,33 +393,6 @@ class StrategyBase:
|
|||
continue
|
||||
return None
|
||||
|
||||
def search_handler_blocks_by_uuid(handler_uuid, handler_blocks):
|
||||
# iterate in reversed order since last handler loaded with the same name wins
|
||||
for handler_block in reversed(handler_blocks):
|
||||
for handler_task in handler_block.block:
|
||||
if handler_uuid == handler_task._uuid:
|
||||
return handler_task
|
||||
return None
|
||||
|
||||
def parent_handler_match(target_handler, handler_name):
|
||||
if target_handler:
|
||||
if isinstance(target_handler, (TaskInclude, IncludeRole)) and not getattr(target_handler, 'statically_loaded', True):
|
||||
try:
|
||||
handler_vars = self._variable_manager.get_vars(play=iterator._play, task=target_handler)
|
||||
templar = Templar(loader=self._loader, variables=handler_vars)
|
||||
target_handler_name = templar.template(target_handler.name)
|
||||
if target_handler_name == handler_name:
|
||||
return True
|
||||
else:
|
||||
target_handler_name = templar.template(target_handler.get_name())
|
||||
if target_handler_name == handler_name:
|
||||
return True
|
||||
except (UndefinedError, AnsibleUndefinedVariable):
|
||||
pass
|
||||
return parent_handler_match(target_handler._parent, handler_name)
|
||||
else:
|
||||
return False
|
||||
|
||||
cur_pass = 0
|
||||
while True:
|
||||
try:
|
||||
|
@ -548,31 +520,18 @@ class StrategyBase:
|
|||
target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers)
|
||||
if target_handler is not None:
|
||||
found = True
|
||||
if target_handler._uuid not in self._notified_handlers:
|
||||
self._notified_handlers[target_handler._uuid] = []
|
||||
if original_host not in self._notified_handlers[target_handler._uuid]:
|
||||
self._notified_handlers[target_handler._uuid].append(original_host)
|
||||
if target_handler.notify_host(original_host):
|
||||
self._tqm.send_callback('v2_playbook_on_notify', target_handler, original_host)
|
||||
else:
|
||||
# As there may be more than one handler with the notified name as the
|
||||
# parent, so we just keep track of whether or not we found one at all
|
||||
for target_handler_uuid in self._notified_handlers:
|
||||
target_handler = search_handler_blocks_by_uuid(target_handler_uuid, iterator._play.handlers)
|
||||
if target_handler and parent_handler_match(target_handler, handler_name):
|
||||
found = True
|
||||
if original_host not in self._notified_handlers[target_handler._uuid]:
|
||||
self._notified_handlers[target_handler._uuid].append(original_host)
|
||||
self._tqm.send_callback('v2_playbook_on_notify', target_handler, original_host)
|
||||
|
||||
if handler_name in self._listening_handlers:
|
||||
for listening_handler_uuid in self._listening_handlers[handler_name]:
|
||||
listening_handler = search_handler_blocks_by_uuid(listening_handler_uuid, iterator._play.handlers)
|
||||
if listening_handler is not None:
|
||||
found = True
|
||||
else:
|
||||
for listening_handler_block in iterator._play.handlers:
|
||||
for listening_handler in listening_handler_block.block:
|
||||
listeners = getattr(listening_handler, 'listen', []) or []
|
||||
if handler_name not in listeners:
|
||||
continue
|
||||
if original_host not in self._notified_handlers[listening_handler._uuid]:
|
||||
self._notified_handlers[listening_handler._uuid].append(original_host)
|
||||
else:
|
||||
found = True
|
||||
|
||||
if listening_handler.notify_host(original_host):
|
||||
self._tqm.send_callback('v2_playbook_on_notify', listening_handler, original_host)
|
||||
|
||||
# and if none were found, then we raise an error
|
||||
|
@ -877,14 +836,9 @@ class StrategyBase:
|
|||
# but this may take some work in the iterator and gets tricky when
|
||||
# we consider the ability of meta tasks to flush handlers
|
||||
for handler in handler_block.block:
|
||||
if handler._uuid in self._notified_handlers and len(self._notified_handlers[handler._uuid]):
|
||||
if handler.notified_hosts:
|
||||
handler_vars = self._variable_manager.get_vars(play=iterator._play, task=handler)
|
||||
templar = Templar(loader=self._loader, variables=handler_vars)
|
||||
handler_name = handler.get_name()
|
||||
try:
|
||||
handler_name = templar.template(handler_name)
|
||||
except (UndefinedError, AnsibleUndefinedVariable):
|
||||
pass
|
||||
result = self._do_handler_run(handler, handler_name, iterator=iterator, play_context=play_context)
|
||||
if not result:
|
||||
break
|
||||
|
@ -898,7 +852,7 @@ class StrategyBase:
|
|||
# result = False
|
||||
# break
|
||||
if notified_hosts is None:
|
||||
notified_hosts = self._notified_handlers[handler._uuid]
|
||||
notified_hosts = handler.notified_hosts[:]
|
||||
|
||||
notified_hosts = self._filter_notified_hosts(notified_hosts)
|
||||
|
||||
|
@ -920,7 +874,7 @@ class StrategyBase:
|
|||
|
||||
host_results = []
|
||||
for host in notified_hosts:
|
||||
if not handler.has_triggered(host) and (not iterator.is_failed(host) or play_context.force_handlers):
|
||||
if not iterator.is_failed(host) or play_context.force_handlers:
|
||||
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=handler)
|
||||
self.add_tqm_variables(task_vars, play=iterator._play)
|
||||
self._queue_task(host, handler, task_vars, play_context)
|
||||
|
@ -953,7 +907,7 @@ class StrategyBase:
|
|||
for task in block.block:
|
||||
task_name = task.get_name()
|
||||
display.debug("adding task '%s' included in handler '%s'" % (task_name, handler_name))
|
||||
self._notified_handlers[task._uuid] = included_file._hosts[:]
|
||||
task.notified_hosts = included_file._hosts[:]
|
||||
result = self._do_handler_run(
|
||||
handler=task,
|
||||
handler_name=task_name,
|
||||
|
@ -971,8 +925,8 @@ class StrategyBase:
|
|||
continue
|
||||
|
||||
# remove hosts from notification list
|
||||
self._notified_handlers[handler._uuid] = [
|
||||
h for h in self._notified_handlers[handler._uuid]
|
||||
handler.notified_hosts = [
|
||||
h for h in handler.notified_hosts
|
||||
if h not in notified_hosts]
|
||||
display.debug("done running handlers, result is: %s" % result)
|
||||
return result
|
||||
|
|
|
@ -225,7 +225,6 @@ class StrategyModule(StrategyBase):
|
|||
variable_manager=self._variable_manager,
|
||||
loader=self._loader,
|
||||
)
|
||||
self._tqm.update_handler_list([handler for handler_block in handler_blocks for handler in handler_block.block])
|
||||
else:
|
||||
new_blocks = self._load_included_file(included_file, iterator=iterator)
|
||||
except AnsibleError as e:
|
||||
|
|
|
@ -355,7 +355,6 @@ class StrategyModule(StrategyBase):
|
|||
variable_manager=self._variable_manager,
|
||||
loader=self._loader,
|
||||
)
|
||||
self._tqm.update_handler_list([handler for handler_block in handler_blocks for handler in handler_block.block])
|
||||
else:
|
||||
new_blocks = self._load_included_file(included_file, iterator=iterator)
|
||||
|
||||
|
|
|
@ -272,7 +272,7 @@ class TestLoadListOfTasks(unittest.TestCase, MixinForMocks):
|
|||
self.assertIsInstance(res[0], Handler)
|
||||
|
||||
# default for Handler
|
||||
self.assertEquals(res[0].listen, None)
|
||||
self.assertEquals(res[0].listen, [])
|
||||
|
||||
# TODO/FIXME: this doesn't seen right
|
||||
# figure out how to get the non-static errors to be raised, this seems to just ignore everything
|
||||
|
|
|
@ -67,8 +67,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_tqm = MagicMock(TaskQueueManager)
|
||||
mock_tqm._final_q = mock_queue
|
||||
mock_tqm._options = MagicMock()
|
||||
mock_tqm._notified_handlers = {}
|
||||
mock_tqm._listening_handlers = {}
|
||||
strategy_base = StrategyBase(tqm=mock_tqm)
|
||||
strategy_base.cleanup()
|
||||
|
||||
|
@ -95,8 +93,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_tqm = MagicMock(TaskQueueManager)
|
||||
mock_tqm._final_q = mock_queue
|
||||
mock_tqm._stats = MagicMock()
|
||||
mock_tqm._notified_handlers = {}
|
||||
mock_tqm._listening_handlers = {}
|
||||
mock_tqm.send_callback.return_value = None
|
||||
|
||||
for attr in ('RUN_OK', 'RUN_ERROR', 'RUN_FAILED_HOSTS', 'RUN_UNREACHABLE_HOSTS'):
|
||||
|
@ -111,8 +107,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_tqm._failed_hosts = dict()
|
||||
mock_tqm._unreachable_hosts = dict()
|
||||
mock_tqm._options = MagicMock()
|
||||
mock_tqm._notified_handlers = {}
|
||||
mock_tqm._listening_handlers = {}
|
||||
strategy_base = StrategyBase(tqm=mock_tqm)
|
||||
|
||||
mock_host = MagicMock()
|
||||
|
@ -160,8 +154,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
|
||||
mock_tqm = MagicMock()
|
||||
mock_tqm._final_q = mock_queue
|
||||
mock_tqm._notified_handlers = {}
|
||||
mock_tqm._listening_handlers = {}
|
||||
mock_tqm.get_inventory.return_value = mock_inventory
|
||||
|
||||
mock_play = MagicMock()
|
||||
|
@ -231,8 +223,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_tqm._failed_hosts = dict()
|
||||
mock_tqm._unreachable_hosts = dict()
|
||||
mock_tqm.send_callback.return_value = None
|
||||
mock_tqm._notified_handlers = {}
|
||||
mock_tqm._listening_handlers = {}
|
||||
|
||||
queue_items = []
|
||||
|
||||
|
@ -274,14 +264,11 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_task.loop = None
|
||||
mock_task.copy.return_value = mock_task
|
||||
|
||||
mock_handler_task = MagicMock(Handler)
|
||||
mock_handler_task = Handler()
|
||||
mock_handler_task.name = 'test handler'
|
||||
mock_handler_task.action = 'foo'
|
||||
mock_handler_task._parent = None
|
||||
mock_handler_task.get_name.return_value = "test handler"
|
||||
mock_handler_task.has_triggered.return_value = False
|
||||
mock_handler_task._uuid = 'xxxxxxxxxxxxx'
|
||||
mock_handler_task.copy.return_value = mock_handler_task
|
||||
|
||||
mock_iterator = MagicMock()
|
||||
mock_iterator._play = mock_play
|
||||
|
@ -294,9 +281,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_handler_block.always = []
|
||||
mock_play.handlers = [mock_handler_block]
|
||||
|
||||
mock_tqm._notified_handlers = {mock_handler_task._uuid: []}
|
||||
mock_tqm._listening_handlers = {}
|
||||
|
||||
mock_group = MagicMock()
|
||||
mock_group.add_host.return_value = None
|
||||
|
||||
|
@ -422,8 +406,7 @@ class TestStrategyBase(unittest.TestCase):
|
|||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(strategy_base._pending_results, 0)
|
||||
self.assertNotIn('test01', strategy_base._blocked_hosts)
|
||||
self.assertIn(mock_handler_task._uuid, strategy_base._notified_handlers)
|
||||
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task._uuid])
|
||||
self.assertTrue(mock_handler_task.is_host_notified(mock_host))
|
||||
|
||||
# queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
|
||||
# results = strategy_base._process_pending_results(iterator=mock_iterator)
|
||||
|
@ -469,8 +452,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
|
||||
mock_tqm = MagicMock()
|
||||
mock_tqm._final_q = mock_queue
|
||||
mock_tqm._notified_handlers = {}
|
||||
mock_tqm._listening_handlers = {}
|
||||
|
||||
strategy_base = StrategyBase(tqm=mock_tqm)
|
||||
strategy_base._loader = fake_loader
|
||||
|
@ -507,15 +488,14 @@ class TestStrategyBase(unittest.TestCase):
|
|||
mock_worker.side_effect = fake_run
|
||||
mock_play_context = MagicMock()
|
||||
|
||||
mock_handler_task = MagicMock(Handler)
|
||||
mock_handler_task = Handler()
|
||||
mock_handler_task.action = 'foo'
|
||||
mock_handler_task.get_name.return_value = "test handler"
|
||||
mock_handler_task.has_triggered.return_value = False
|
||||
mock_handler_task.listen = None
|
||||
mock_handler_task.cached_name = False
|
||||
mock_handler_task.name = "test handler"
|
||||
mock_handler_task.listen = []
|
||||
mock_handler_task._role = None
|
||||
mock_handler_task._parent = None
|
||||
mock_handler_task._uuid = 'xxxxxxxxxxxxxxxx'
|
||||
mock_handler_task.copy.return_value = mock_handler_task
|
||||
|
||||
mock_handler = MagicMock()
|
||||
mock_handler.block = [mock_handler_task]
|
||||
|
@ -558,7 +538,6 @@ class TestStrategyBase(unittest.TestCase):
|
|||
strategy_base = StrategyBase(tqm=tqm)
|
||||
|
||||
strategy_base._inventory = mock_inventory
|
||||
strategy_base._notified_handlers = {mock_handler_task._uuid: [mock_host]}
|
||||
|
||||
task_result = TaskResult(mock_host.name, mock_handler_task._uuid, dict(changed=False))
|
||||
strategy_base._queued_task_cache = dict()
|
||||
|
|
Loading…
Reference in a new issue