7f9ac0f364
* Consolidate filters/tests handling into JinjaPluginIntercept ci_complete * Postpone loading all ansible plugins * Do we need to create an overlay? ci_complete * Typo ci_complete * Add FIXME * conditional.py: use public Environment.parse() method * Remove remaining occurrences of shared_loader_obj being passed to Templar * __UNROLLED__ not needed with this change anymore * Incorrect rebase at some point?
1390 lines
62 KiB
Python
1390 lines
62 KiB
Python
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
|
|
#
|
|
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
# Make coding more python3-ish
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import cmd
|
|
import functools
|
|
import os
|
|
import pprint
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
from collections import deque
|
|
from multiprocessing import Lock
|
|
from jinja2.exceptions import UndefinedError
|
|
|
|
from ansible import constants as C
|
|
from ansible import context
|
|
from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleParserError, AnsibleUndefinedVariable
|
|
from ansible.executor import action_write_locks
|
|
from ansible.executor.process.worker import WorkerProcess
|
|
from ansible.executor.task_result import TaskResult
|
|
from ansible.executor.task_queue_manager import CallbackSend
|
|
from ansible.module_utils.six.moves import queue as Queue
|
|
from ansible.module_utils.six import iteritems, itervalues, string_types
|
|
from ansible.module_utils._text import to_text
|
|
from ansible.module_utils.connection import Connection, ConnectionError
|
|
from ansible.playbook.conditional import Conditional
|
|
from ansible.playbook.handler import Handler
|
|
from ansible.playbook.helpers import load_list_of_blocks
|
|
from ansible.playbook.included_file import IncludedFile
|
|
from ansible.playbook.task_include import TaskInclude
|
|
from ansible.plugins import loader as plugin_loader
|
|
from ansible.template import Templar
|
|
from ansible.utils.display import Display
|
|
from ansible.utils.vars import combine_vars
|
|
from ansible.vars.clean import strip_internal_keys, module_response_deepcopy
|
|
|
|
display = Display()
|
|
|
|
__all__ = ['StrategyBase']
|
|
|
|
# This list can be an exact match, or start of string bound
|
|
# does not accept regex
|
|
ALWAYS_DELEGATE_FACT_PREFIXES = frozenset((
|
|
'discovered_interpreter_',
|
|
))
|
|
|
|
|
|
class StrategySentinel:
|
|
pass
|
|
|
|
|
|
_sentinel = StrategySentinel()
|
|
|
|
|
|
def post_process_whens(result, task, templar):
|
|
|
|
cond = None
|
|
if task.changed_when:
|
|
cond = Conditional(loader=templar._loader)
|
|
cond.when = task.changed_when
|
|
result['changed'] = cond.evaluate_conditional(templar, templar.available_variables)
|
|
|
|
if task.failed_when:
|
|
if cond is None:
|
|
cond = Conditional(loader=templar._loader)
|
|
cond.when = task.failed_when
|
|
failed_when_result = cond.evaluate_conditional(templar, templar.available_variables)
|
|
result['failed_when_result'] = result['failed'] = failed_when_result
|
|
|
|
|
|
def results_thread_main(strategy):
|
|
while True:
|
|
try:
|
|
result = strategy._final_q.get()
|
|
if isinstance(result, StrategySentinel):
|
|
break
|
|
elif isinstance(result, CallbackSend):
|
|
strategy._tqm.send_callback(result.method_name, *result.args, **result.kwargs)
|
|
elif isinstance(result, TaskResult):
|
|
with strategy._results_lock:
|
|
# only handlers have the listen attr, so this must be a handler
|
|
# we split up the results into two queues here to make sure
|
|
# handler and regular result processing don't cross wires
|
|
if 'listen' in result._task_fields:
|
|
strategy._handler_results.append(result)
|
|
else:
|
|
strategy._results.append(result)
|
|
else:
|
|
display.warning('Received an invalid object (%s) in the result queue: %r' % (type(result), result))
|
|
except (IOError, EOFError):
|
|
break
|
|
except Queue.Empty:
|
|
pass
|
|
|
|
|
|
def debug_closure(func):
|
|
"""Closure to wrap ``StrategyBase._process_pending_results`` and invoke the task debugger"""
|
|
@functools.wraps(func)
|
|
def inner(self, iterator, one_pass=False, max_passes=None, do_handlers=False):
|
|
status_to_stats_map = (
|
|
('is_failed', 'failures'),
|
|
('is_unreachable', 'dark'),
|
|
('is_changed', 'changed'),
|
|
('is_skipped', 'skipped'),
|
|
)
|
|
|
|
# We don't know the host yet, copy the previous states, for lookup after we process new results
|
|
prev_host_states = iterator._host_states.copy()
|
|
|
|
results = func(self, iterator, one_pass=one_pass, max_passes=max_passes, do_handlers=do_handlers)
|
|
_processed_results = []
|
|
|
|
for result in results:
|
|
task = result._task
|
|
host = result._host
|
|
_queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None)
|
|
task_vars = _queued_task_args['task_vars']
|
|
play_context = _queued_task_args['play_context']
|
|
# Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state
|
|
try:
|
|
prev_host_state = prev_host_states[host.name]
|
|
except KeyError:
|
|
prev_host_state = iterator.get_host_state(host)
|
|
|
|
while result.needs_debugger(globally_enabled=self.debugger_active):
|
|
next_action = NextAction()
|
|
dbg = Debugger(task, host, task_vars, play_context, result, next_action)
|
|
dbg.cmdloop()
|
|
|
|
if next_action.result == NextAction.REDO:
|
|
# rollback host state
|
|
self._tqm.clear_failed_hosts()
|
|
iterator._host_states[host.name] = prev_host_state
|
|
for method, what in status_to_stats_map:
|
|
if getattr(result, method)():
|
|
self._tqm._stats.decrement(what, host.name)
|
|
self._tqm._stats.decrement('ok', host.name)
|
|
|
|
# redo
|
|
self._queue_task(host, task, task_vars, play_context)
|
|
|
|
_processed_results.extend(debug_closure(func)(self, iterator, one_pass))
|
|
break
|
|
elif next_action.result == NextAction.CONTINUE:
|
|
_processed_results.append(result)
|
|
break
|
|
elif next_action.result == NextAction.EXIT:
|
|
# Matches KeyboardInterrupt from bin/ansible
|
|
sys.exit(99)
|
|
else:
|
|
_processed_results.append(result)
|
|
|
|
return _processed_results
|
|
return inner
|
|
|
|
|
|
class StrategyBase:
|
|
|
|
'''
|
|
This is the base class for strategy plugins, which contains some common
|
|
code useful to all strategies like running handlers, cleanup actions, etc.
|
|
'''
|
|
|
|
# by default, strategies should support throttling but we allow individual
|
|
# strategies to disable this and either forego supporting it or managing
|
|
# the throttling internally (as `free` does)
|
|
ALLOW_BASE_THROTTLING = True
|
|
|
|
def __init__(self, tqm):
|
|
self._tqm = tqm
|
|
self._inventory = tqm.get_inventory()
|
|
self._workers = tqm._workers
|
|
self._variable_manager = tqm.get_variable_manager()
|
|
self._loader = tqm.get_loader()
|
|
self._final_q = tqm._final_q
|
|
self._step = context.CLIARGS.get('step', False)
|
|
self._diff = context.CLIARGS.get('diff', False)
|
|
|
|
# the task cache is a dictionary of tuples of (host.name, task._uuid)
|
|
# used to find the original task object of in-flight tasks and to store
|
|
# the task args/vars and play context info used to queue the task.
|
|
self._queued_task_cache = {}
|
|
|
|
# Backwards compat: self._display isn't really needed, just import the global display and use that.
|
|
self._display = display
|
|
|
|
# internal counters
|
|
self._pending_results = 0
|
|
self._pending_handler_results = 0
|
|
self._cur_worker = 0
|
|
|
|
# this dictionary is used to keep track of hosts that have
|
|
# outstanding tasks still in queue
|
|
self._blocked_hosts = dict()
|
|
|
|
# this dictionary is used to keep track of hosts that have
|
|
# flushed handlers
|
|
self._flushed_hosts = dict()
|
|
|
|
self._results = deque()
|
|
self._handler_results = deque()
|
|
self._results_lock = threading.Condition(threading.Lock())
|
|
|
|
# create the result processing thread for reading results in the background
|
|
self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
|
|
self._results_thread.daemon = True
|
|
self._results_thread.start()
|
|
|
|
# holds the list of active (persistent) connections to be shutdown at
|
|
# play completion
|
|
self._active_connections = dict()
|
|
|
|
# Caches for get_host calls, to avoid calling excessively
|
|
# These values should be set at the top of the ``run`` method of each
|
|
# strategy plugin. Use ``_set_hosts_cache`` to set these values
|
|
self._hosts_cache = []
|
|
self._hosts_cache_all = []
|
|
|
|
self.debugger_active = C.ENABLE_TASK_DEBUGGER
|
|
|
|
def _set_hosts_cache(self, play, refresh=True):
|
|
"""Responsible for setting _hosts_cache and _hosts_cache_all
|
|
|
|
See comment in ``__init__`` for the purpose of these caches
|
|
"""
|
|
if not refresh and all((self._hosts_cache, self._hosts_cache_all)):
|
|
return
|
|
|
|
if Templar(None).is_template(play.hosts):
|
|
_pattern = 'all'
|
|
else:
|
|
_pattern = play.hosts or 'all'
|
|
self._hosts_cache_all = [h.name for h in self._inventory.get_hosts(pattern=_pattern, ignore_restrictions=True)]
|
|
self._hosts_cache = [h.name for h in self._inventory.get_hosts(play.hosts, order=play.order)]
|
|
|
|
def cleanup(self):
|
|
# close active persistent connections
|
|
for sock in itervalues(self._active_connections):
|
|
try:
|
|
conn = Connection(sock)
|
|
conn.reset()
|
|
except ConnectionError as e:
|
|
# most likely socket is already closed
|
|
display.debug("got an error while closing persistent connection: %s" % e)
|
|
self._final_q.put(_sentinel)
|
|
self._results_thread.join()
|
|
|
|
def run(self, iterator, play_context, result=0):
|
|
# execute one more pass through the iterator without peeking, to
|
|
# make sure that all of the hosts are advanced to their final task.
|
|
# This should be safe, as everything should be ITERATING_COMPLETE by
|
|
# this point, though the strategy may not advance the hosts itself.
|
|
|
|
for host in self._hosts_cache:
|
|
if host not in self._tqm._unreachable_hosts:
|
|
try:
|
|
iterator.get_next_task_for_host(self._inventory.hosts[host])
|
|
except KeyError:
|
|
iterator.get_next_task_for_host(self._inventory.get_host(host))
|
|
|
|
# save the failed/unreachable hosts, as the run_handlers()
|
|
# method will clear that information during its execution
|
|
failed_hosts = iterator.get_failed_hosts()
|
|
unreachable_hosts = self._tqm._unreachable_hosts.keys()
|
|
|
|
display.debug("running handlers")
|
|
handler_result = self.run_handlers(iterator, play_context)
|
|
if isinstance(handler_result, bool) and not handler_result:
|
|
result |= self._tqm.RUN_ERROR
|
|
elif not handler_result:
|
|
result |= handler_result
|
|
|
|
# now update with the hosts (if any) that failed or were
|
|
# unreachable during the handler execution phase
|
|
failed_hosts = set(failed_hosts).union(iterator.get_failed_hosts())
|
|
unreachable_hosts = set(unreachable_hosts).union(self._tqm._unreachable_hosts.keys())
|
|
|
|
# return the appropriate code, depending on the status hosts after the run
|
|
if not isinstance(result, bool) and result != self._tqm.RUN_OK:
|
|
return result
|
|
elif len(unreachable_hosts) > 0:
|
|
return self._tqm.RUN_UNREACHABLE_HOSTS
|
|
elif len(failed_hosts) > 0:
|
|
return self._tqm.RUN_FAILED_HOSTS
|
|
else:
|
|
return self._tqm.RUN_OK
|
|
|
|
def get_hosts_remaining(self, play):
|
|
self._set_hosts_cache(play, refresh=False)
|
|
ignore = set(self._tqm._failed_hosts).union(self._tqm._unreachable_hosts)
|
|
return [host for host in self._hosts_cache if host not in ignore]
|
|
|
|
def get_failed_hosts(self, play):
|
|
self._set_hosts_cache(play, refresh=False)
|
|
return [host for host in self._hosts_cache if host in self._tqm._failed_hosts]
|
|
|
|
def add_tqm_variables(self, vars, play):
|
|
'''
|
|
Base class method to add extra variables/information to the list of task
|
|
vars sent through the executor engine regarding the task queue manager state.
|
|
'''
|
|
vars['ansible_current_hosts'] = self.get_hosts_remaining(play)
|
|
vars['ansible_failed_hosts'] = self.get_failed_hosts(play)
|
|
|
|
def _queue_task(self, host, task, task_vars, play_context):
|
|
''' handles queueing the task up to be sent to a worker '''
|
|
|
|
display.debug("entering _queue_task() for %s/%s" % (host.name, task.action))
|
|
|
|
# Add a write lock for tasks.
|
|
# Maybe this should be added somewhere further up the call stack but
|
|
# this is the earliest in the code where we have task (1) extracted
|
|
# into its own variable and (2) there's only a single code path
|
|
# leading to the module being run. This is called by three
|
|
# functions: __init__.py::_do_handler_run(), linear.py::run(), and
|
|
# free.py::run() so we'd have to add to all three to do it there.
|
|
# The next common higher level is __init__.py::run() and that has
|
|
# tasks inside of play_iterator so we'd have to extract them to do it
|
|
# there.
|
|
|
|
if task.action not in action_write_locks.action_write_locks:
|
|
display.debug('Creating lock for %s' % task.action)
|
|
action_write_locks.action_write_locks[task.action] = Lock()
|
|
|
|
# create a templar and template things we need later for the queuing process
|
|
templar = Templar(loader=self._loader, variables=task_vars)
|
|
|
|
try:
|
|
throttle = int(templar.template(task.throttle))
|
|
except Exception as e:
|
|
raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)
|
|
|
|
# and then queue the new task
|
|
try:
|
|
# Determine the "rewind point" of the worker list. This means we start
|
|
# iterating over the list of workers until the end of the list is found.
|
|
# Normally, that is simply the length of the workers list (as determined
|
|
# by the forks or serial setting), however a task/block/play may "throttle"
|
|
# that limit down.
|
|
rewind_point = len(self._workers)
|
|
if throttle > 0 and self.ALLOW_BASE_THROTTLING:
|
|
if task.run_once:
|
|
display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name())
|
|
else:
|
|
if throttle <= rewind_point:
|
|
display.debug("task: %s, throttle: %d" % (task.get_name(), throttle))
|
|
rewind_point = throttle
|
|
|
|
queued = False
|
|
starting_worker = self._cur_worker
|
|
while True:
|
|
if self._cur_worker >= rewind_point:
|
|
self._cur_worker = 0
|
|
|
|
worker_prc = self._workers[self._cur_worker]
|
|
if worker_prc is None or not worker_prc.is_alive():
|
|
self._queued_task_cache[(host.name, task._uuid)] = {
|
|
'host': host,
|
|
'task': task,
|
|
'task_vars': task_vars,
|
|
'play_context': play_context
|
|
}
|
|
|
|
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, plugin_loader)
|
|
self._workers[self._cur_worker] = worker_prc
|
|
self._tqm.send_callback('v2_runner_on_start', host, task)
|
|
worker_prc.start()
|
|
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
|
|
queued = True
|
|
|
|
self._cur_worker += 1
|
|
|
|
if self._cur_worker >= rewind_point:
|
|
self._cur_worker = 0
|
|
|
|
if queued:
|
|
break
|
|
elif self._cur_worker == starting_worker:
|
|
time.sleep(0.0001)
|
|
|
|
if isinstance(task, Handler):
|
|
self._pending_handler_results += 1
|
|
else:
|
|
self._pending_results += 1
|
|
except (EOFError, IOError, AssertionError) as e:
|
|
# most likely an abort
|
|
display.debug("got an error while queuing: %s" % e)
|
|
return
|
|
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
|
|
|
|
def get_task_hosts(self, iterator, task_host, task):
|
|
if task.run_once:
|
|
host_list = [host for host in self._hosts_cache if host not in self._tqm._unreachable_hosts]
|
|
else:
|
|
host_list = [task_host.name]
|
|
return host_list
|
|
|
|
def get_delegated_hosts(self, result, task):
|
|
host_name = result.get('_ansible_delegated_vars', {}).get('ansible_delegated_host', None)
|
|
return [host_name or task.delegate_to]
|
|
|
|
def _set_always_delegated_facts(self, result, task):
|
|
"""Sets host facts for ``delegate_to`` hosts for facts that should
|
|
always be delegated
|
|
|
|
This operation mutates ``result`` to remove the always delegated facts
|
|
|
|
See ``ALWAYS_DELEGATE_FACT_PREFIXES``
|
|
"""
|
|
if task.delegate_to is None:
|
|
return
|
|
|
|
facts = result['ansible_facts']
|
|
always_keys = set()
|
|
_add = always_keys.add
|
|
for fact_key in facts:
|
|
for always_key in ALWAYS_DELEGATE_FACT_PREFIXES:
|
|
if fact_key.startswith(always_key):
|
|
_add(fact_key)
|
|
if always_keys:
|
|
_pop = facts.pop
|
|
always_facts = {
|
|
'ansible_facts': dict((k, _pop(k)) for k in list(facts) if k in always_keys)
|
|
}
|
|
host_list = self.get_delegated_hosts(result, task)
|
|
_set_host_facts = self._variable_manager.set_host_facts
|
|
for target_host in host_list:
|
|
_set_host_facts(target_host, always_facts)
|
|
|
|
@debug_closure
|
|
def _process_pending_results(self, iterator, one_pass=False, max_passes=None, do_handlers=False):
|
|
'''
|
|
Reads results off the final queue and takes appropriate action
|
|
based on the result (executing callbacks, updating state, etc.).
|
|
'''
|
|
|
|
ret_results = []
|
|
handler_templar = Templar(self._loader)
|
|
|
|
def get_original_host(host_name):
|
|
# FIXME: this should not need x2 _inventory
|
|
host_name = to_text(host_name)
|
|
if host_name in self._inventory.hosts:
|
|
return self._inventory.hosts[host_name]
|
|
else:
|
|
return self._inventory.get_host(host_name)
|
|
|
|
def search_handler_blocks_by_name(handler_name, handler_blocks):
|
|
# iterate in reversed order since last handler loaded with the same name wins
|
|
for handler_block in reversed(handler_blocks):
|
|
for handler_task in handler_block.block:
|
|
if handler_task.name:
|
|
if not handler_task.cached_name:
|
|
if handler_templar.is_template(handler_task.name):
|
|
handler_templar.available_variables = self._variable_manager.get_vars(play=iterator._play,
|
|
task=handler_task,
|
|
_hosts=self._hosts_cache,
|
|
_hosts_all=self._hosts_cache_all)
|
|
handler_task.name = handler_templar.template(handler_task.name)
|
|
handler_task.cached_name = True
|
|
|
|
try:
|
|
# first we check with the full result of get_name(), which may
|
|
# include the role name (if the handler is from a role). If that
|
|
# is not found, we resort to the simple name field, which doesn't
|
|
# have anything extra added to it.
|
|
candidates = (
|
|
handler_task.name,
|
|
handler_task.get_name(include_role_fqcn=False),
|
|
handler_task.get_name(include_role_fqcn=True),
|
|
)
|
|
|
|
if handler_name in candidates:
|
|
return handler_task
|
|
except (UndefinedError, AnsibleUndefinedVariable):
|
|
# We skip this handler due to the fact that it may be using
|
|
# a variable in the name that was conditionally included via
|
|
# set_fact or some other method, and we don't want to error
|
|
# out unnecessarily
|
|
continue
|
|
return None
|
|
|
|
cur_pass = 0
|
|
while True:
|
|
try:
|
|
self._results_lock.acquire()
|
|
if do_handlers:
|
|
task_result = self._handler_results.popleft()
|
|
else:
|
|
task_result = self._results.popleft()
|
|
except IndexError:
|
|
break
|
|
finally:
|
|
self._results_lock.release()
|
|
|
|
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
|
|
original_host = get_original_host(task_result._host)
|
|
queue_cache_entry = (original_host.name, task_result._task)
|
|
found_task = self._queued_task_cache.get(queue_cache_entry)['task']
|
|
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
|
|
original_task._parent = found_task._parent
|
|
original_task.from_attrs(task_result._task_fields)
|
|
|
|
task_result._host = original_host
|
|
task_result._task = original_task
|
|
|
|
# send callbacks for 'non final' results
|
|
if '_ansible_retry' in task_result._result:
|
|
self._tqm.send_callback('v2_runner_retry', task_result)
|
|
continue
|
|
elif '_ansible_item_result' in task_result._result:
|
|
if task_result.is_failed() or task_result.is_unreachable():
|
|
self._tqm.send_callback('v2_runner_item_on_failed', task_result)
|
|
elif task_result.is_skipped():
|
|
self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
|
|
else:
|
|
if 'diff' in task_result._result:
|
|
if self._diff or getattr(original_task, 'diff', False):
|
|
self._tqm.send_callback('v2_on_file_diff', task_result)
|
|
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
|
|
continue
|
|
|
|
# all host status messages contain 2 entries: (msg, task_result)
|
|
role_ran = False
|
|
if task_result.is_failed():
|
|
role_ran = True
|
|
ignore_errors = original_task.ignore_errors
|
|
if not ignore_errors:
|
|
display.debug("marking %s as failed" % original_host.name)
|
|
if original_task.run_once:
|
|
# if we're using run_once, we have to fail every host here
|
|
for h in self._inventory.get_hosts(iterator._play.hosts):
|
|
if h.name not in self._tqm._unreachable_hosts:
|
|
state, _ = iterator.get_next_task_for_host(h, peek=True)
|
|
iterator.mark_host_failed(h)
|
|
state, new_task = iterator.get_next_task_for_host(h, peek=True)
|
|
else:
|
|
iterator.mark_host_failed(original_host)
|
|
|
|
# grab the current state and if we're iterating on the rescue portion
|
|
# of a block then we save the failed task in a special var for use
|
|
# within the rescue/always
|
|
state, _ = iterator.get_next_task_for_host(original_host, peek=True)
|
|
|
|
if iterator.is_failed(original_host) and state and state.run_state == iterator.ITERATING_COMPLETE:
|
|
self._tqm._failed_hosts[original_host.name] = True
|
|
|
|
# Use of get_active_state() here helps detect proper state if, say, we are in a rescue
|
|
# block from an included file (include_tasks). In a non-included rescue case, a rescue
|
|
# that starts with a new 'block' will have an active state of ITERATING_TASKS, so we also
|
|
# check the current state block tree to see if any blocks are rescuing.
|
|
if state and (iterator.get_active_state(state).run_state == iterator.ITERATING_RESCUE or
|
|
iterator.is_any_block_rescuing(state)):
|
|
self._tqm._stats.increment('rescued', original_host.name)
|
|
self._variable_manager.set_nonpersistent_facts(
|
|
original_host.name,
|
|
dict(
|
|
ansible_failed_task=original_task.serialize(),
|
|
ansible_failed_result=task_result._result,
|
|
),
|
|
)
|
|
else:
|
|
self._tqm._stats.increment('failures', original_host.name)
|
|
else:
|
|
self._tqm._stats.increment('ok', original_host.name)
|
|
self._tqm._stats.increment('ignored', original_host.name)
|
|
if 'changed' in task_result._result and task_result._result['changed']:
|
|
self._tqm._stats.increment('changed', original_host.name)
|
|
self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors)
|
|
elif task_result.is_unreachable():
|
|
ignore_unreachable = original_task.ignore_unreachable
|
|
if not ignore_unreachable:
|
|
self._tqm._unreachable_hosts[original_host.name] = True
|
|
iterator._play._removed_hosts.append(original_host.name)
|
|
else:
|
|
self._tqm._stats.increment('skipped', original_host.name)
|
|
task_result._result['skip_reason'] = 'Host %s is unreachable' % original_host.name
|
|
self._tqm._stats.increment('dark', original_host.name)
|
|
self._tqm.send_callback('v2_runner_on_unreachable', task_result)
|
|
elif task_result.is_skipped():
|
|
self._tqm._stats.increment('skipped', original_host.name)
|
|
self._tqm.send_callback('v2_runner_on_skipped', task_result)
|
|
else:
|
|
role_ran = True
|
|
|
|
if original_task.loop:
|
|
# this task had a loop, and has more than one result, so
|
|
# loop over all of them instead of a single result
|
|
result_items = task_result._result.get('results', [])
|
|
else:
|
|
result_items = [task_result._result]
|
|
|
|
for result_item in result_items:
|
|
if '_ansible_notify' in result_item:
|
|
if task_result.is_changed():
|
|
# The shared dictionary for notified handlers is a proxy, which
|
|
# does not detect when sub-objects within the proxy are modified.
|
|
# So, per the docs, we reassign the list so the proxy picks up and
|
|
# notifies all other threads
|
|
for handler_name in result_item['_ansible_notify']:
|
|
found = False
|
|
# Find the handler using the above helper. First we look up the
|
|
# dependency chain of the current task (if it's from a role), otherwise
|
|
# we just look through the list of handlers in the current play/all
|
|
# roles and use the first one that matches the notify name
|
|
target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers)
|
|
if target_handler is not None:
|
|
found = True
|
|
if target_handler.notify_host(original_host):
|
|
self._tqm.send_callback('v2_playbook_on_notify', target_handler, original_host)
|
|
|
|
for listening_handler_block in iterator._play.handlers:
|
|
for listening_handler in listening_handler_block.block:
|
|
listeners = getattr(listening_handler, 'listen', []) or []
|
|
if not listeners:
|
|
continue
|
|
|
|
listeners = listening_handler.get_validated_value(
|
|
'listen', listening_handler._valid_attrs['listen'], listeners, handler_templar
|
|
)
|
|
if handler_name not in listeners:
|
|
continue
|
|
else:
|
|
found = True
|
|
|
|
if listening_handler.notify_host(original_host):
|
|
self._tqm.send_callback('v2_playbook_on_notify', listening_handler, original_host)
|
|
|
|
# and if none were found, then we raise an error
|
|
if not found:
|
|
msg = ("The requested handler '%s' was not found in either the main handlers list nor in the listening "
|
|
"handlers list" % handler_name)
|
|
if C.ERROR_ON_MISSING_HANDLER:
|
|
raise AnsibleError(msg)
|
|
else:
|
|
display.warning(msg)
|
|
|
|
if 'add_host' in result_item:
|
|
# this task added a new host (add_host module)
|
|
new_host_info = result_item.get('add_host', dict())
|
|
self._add_host(new_host_info, result_item)
|
|
post_process_whens(result_item, original_task, handler_templar)
|
|
|
|
elif 'add_group' in result_item:
|
|
# this task added a new group (group_by module)
|
|
self._add_group(original_host, result_item)
|
|
post_process_whens(result_item, original_task, handler_templar)
|
|
|
|
if 'ansible_facts' in result_item:
|
|
# if delegated fact and we are delegating facts, we need to change target host for them
|
|
if original_task.delegate_to is not None and original_task.delegate_facts:
|
|
host_list = self.get_delegated_hosts(result_item, original_task)
|
|
else:
|
|
# Set facts that should always be on the delegated hosts
|
|
self._set_always_delegated_facts(result_item, original_task)
|
|
|
|
host_list = self.get_task_hosts(iterator, original_host, original_task)
|
|
|
|
if original_task.action in C._ACTION_INCLUDE_VARS:
|
|
for (var_name, var_value) in iteritems(result_item['ansible_facts']):
|
|
# find the host we're actually referring too here, which may
|
|
# be a host that is not really in inventory at all
|
|
for target_host in host_list:
|
|
self._variable_manager.set_host_variable(target_host, var_name, var_value)
|
|
else:
|
|
cacheable = result_item.pop('_ansible_facts_cacheable', False)
|
|
for target_host in host_list:
|
|
# so set_fact is a misnomer but 'cacheable = true' was meant to create an 'actual fact'
|
|
# to avoid issues with precedence and confusion with set_fact normal operation,
|
|
# we set BOTH fact and nonpersistent_facts (aka hostvar)
|
|
# when fact is retrieved from cache in subsequent operations it will have the lower precedence,
|
|
# but for playbook setting it the 'higher' precedence is kept
|
|
is_set_fact = original_task.action in C._ACTION_SET_FACT
|
|
if not is_set_fact or cacheable:
|
|
self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
|
|
if is_set_fact:
|
|
self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
|
|
|
|
if 'ansible_stats' in result_item and 'data' in result_item['ansible_stats'] and result_item['ansible_stats']['data']:
|
|
|
|
if 'per_host' not in result_item['ansible_stats'] or result_item['ansible_stats']['per_host']:
|
|
host_list = self.get_task_hosts(iterator, original_host, original_task)
|
|
else:
|
|
host_list = [None]
|
|
|
|
data = result_item['ansible_stats']['data']
|
|
aggregate = 'aggregate' in result_item['ansible_stats'] and result_item['ansible_stats']['aggregate']
|
|
for myhost in host_list:
|
|
for k in data.keys():
|
|
if aggregate:
|
|
self._tqm._stats.update_custom_stats(k, data[k], myhost)
|
|
else:
|
|
self._tqm._stats.set_custom_stats(k, data[k], myhost)
|
|
|
|
if 'diff' in task_result._result:
|
|
if self._diff or getattr(original_task, 'diff', False):
|
|
self._tqm.send_callback('v2_on_file_diff', task_result)
|
|
|
|
if not isinstance(original_task, TaskInclude):
|
|
self._tqm._stats.increment('ok', original_host.name)
|
|
if 'changed' in task_result._result and task_result._result['changed']:
|
|
self._tqm._stats.increment('changed', original_host.name)
|
|
|
|
# finally, send the ok for this task
|
|
self._tqm.send_callback('v2_runner_on_ok', task_result)
|
|
|
|
# register final results
|
|
if original_task.register:
|
|
host_list = self.get_task_hosts(iterator, original_host, original_task)
|
|
|
|
clean_copy = strip_internal_keys(module_response_deepcopy(task_result._result))
|
|
if 'invocation' in clean_copy:
|
|
del clean_copy['invocation']
|
|
|
|
for target_host in host_list:
|
|
self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
|
|
|
|
if do_handlers:
|
|
self._pending_handler_results -= 1
|
|
else:
|
|
self._pending_results -= 1
|
|
if original_host.name in self._blocked_hosts:
|
|
del self._blocked_hosts[original_host.name]
|
|
|
|
# If this is a role task, mark the parent role as being run (if
|
|
# the task was ok or failed, but not skipped or unreachable)
|
|
if original_task._role is not None and role_ran: # TODO: and original_task.action not in C._ACTION_INCLUDE_ROLE:?
|
|
# lookup the role in the ROLE_CACHE to make sure we're dealing
|
|
# with the correct object and mark it as executed
|
|
for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role.get_name()]):
|
|
if role_obj._uuid == original_task._role._uuid:
|
|
role_obj._had_task_run[original_host.name] = True
|
|
|
|
ret_results.append(task_result)
|
|
|
|
if one_pass or max_passes is not None and (cur_pass + 1) >= max_passes:
|
|
break
|
|
|
|
cur_pass += 1
|
|
|
|
return ret_results
|
|
|
|
def _wait_on_handler_results(self, iterator, handler, notified_hosts):
|
|
'''
|
|
Wait for the handler tasks to complete, using a short sleep
|
|
between checks to ensure we don't spin lock
|
|
'''
|
|
|
|
ret_results = []
|
|
handler_results = 0
|
|
|
|
display.debug("waiting for handler results...")
|
|
while (self._pending_handler_results > 0 and
|
|
handler_results < len(notified_hosts) and
|
|
not self._tqm._terminated):
|
|
|
|
if self._tqm.has_dead_workers():
|
|
raise AnsibleError("A worker was found in a dead state")
|
|
|
|
results = self._process_pending_results(iterator, do_handlers=True)
|
|
ret_results.extend(results)
|
|
handler_results += len([
|
|
r._host for r in results if r._host in notified_hosts and
|
|
r.task_name == handler.name])
|
|
if self._pending_handler_results > 0:
|
|
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
|
|
|
|
display.debug("no more pending handlers, returning what we have")
|
|
|
|
return ret_results
|
|
|
|
def _wait_on_pending_results(self, iterator):
|
|
'''
|
|
Wait for the shared counter to drop to zero, using a short sleep
|
|
between checks to ensure we don't spin lock
|
|
'''
|
|
|
|
ret_results = []
|
|
|
|
display.debug("waiting for pending results...")
|
|
while self._pending_results > 0 and not self._tqm._terminated:
|
|
|
|
if self._tqm.has_dead_workers():
|
|
raise AnsibleError("A worker was found in a dead state")
|
|
|
|
results = self._process_pending_results(iterator)
|
|
ret_results.extend(results)
|
|
if self._pending_results > 0:
|
|
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
|
|
|
|
display.debug("no more pending results, returning what we have")
|
|
|
|
return ret_results
|
|
|
|
def _add_host(self, host_info, result_item):
|
|
'''
|
|
Helper function to add a new host to inventory based on a task result.
|
|
'''
|
|
|
|
changed = False
|
|
|
|
if host_info:
|
|
host_name = host_info.get('host_name')
|
|
|
|
# Check if host in inventory, add if not
|
|
if host_name not in self._inventory.hosts:
|
|
self._inventory.add_host(host_name, 'all')
|
|
self._hosts_cache_all.append(host_name)
|
|
changed = True
|
|
new_host = self._inventory.hosts.get(host_name)
|
|
|
|
# Set/update the vars for this host
|
|
new_host_vars = new_host.get_vars()
|
|
new_host_combined_vars = combine_vars(new_host_vars, host_info.get('host_vars', dict()))
|
|
if new_host_vars != new_host_combined_vars:
|
|
new_host.vars = new_host_combined_vars
|
|
changed = True
|
|
|
|
new_groups = host_info.get('groups', [])
|
|
for group_name in new_groups:
|
|
if group_name not in self._inventory.groups:
|
|
group_name = self._inventory.add_group(group_name)
|
|
changed = True
|
|
new_group = self._inventory.groups[group_name]
|
|
if new_group.add_host(self._inventory.hosts[host_name]):
|
|
changed = True
|
|
|
|
# reconcile inventory, ensures inventory rules are followed
|
|
if changed:
|
|
self._inventory.reconcile_inventory()
|
|
|
|
result_item['changed'] = changed
|
|
|
|
def _add_group(self, host, result_item):
|
|
'''
|
|
Helper function to add a group (if it does not exist), and to assign the
|
|
specified host to that group.
|
|
'''
|
|
|
|
changed = False
|
|
|
|
# the host here is from the executor side, which means it was a
|
|
# serialized/cloned copy and we'll need to look up the proper
|
|
# host object from the master inventory
|
|
real_host = self._inventory.hosts.get(host.name)
|
|
if real_host is None:
|
|
if host.name == self._inventory.localhost.name:
|
|
real_host = self._inventory.localhost
|
|
else:
|
|
raise AnsibleError('%s cannot be matched in inventory' % host.name)
|
|
group_name = result_item.get('add_group')
|
|
parent_group_names = result_item.get('parent_groups', [])
|
|
|
|
if group_name not in self._inventory.groups:
|
|
group_name = self._inventory.add_group(group_name)
|
|
|
|
for name in parent_group_names:
|
|
if name not in self._inventory.groups:
|
|
# create the new group and add it to inventory
|
|
self._inventory.add_group(name)
|
|
changed = True
|
|
|
|
group = self._inventory.groups[group_name]
|
|
for parent_group_name in parent_group_names:
|
|
parent_group = self._inventory.groups[parent_group_name]
|
|
new = parent_group.add_child_group(group)
|
|
if new and not changed:
|
|
changed = True
|
|
|
|
if real_host not in group.get_hosts():
|
|
changed = group.add_host(real_host)
|
|
|
|
if group not in real_host.get_groups():
|
|
changed = real_host.add_group(group)
|
|
|
|
if changed:
|
|
self._inventory.reconcile_inventory()
|
|
|
|
result_item['changed'] = changed
|
|
|
|
def _copy_included_file(self, included_file):
|
|
'''
|
|
A proven safe and performant way to create a copy of an included file
|
|
'''
|
|
ti_copy = included_file._task.copy(exclude_parent=True)
|
|
ti_copy._parent = included_file._task._parent
|
|
|
|
temp_vars = ti_copy.vars.copy()
|
|
temp_vars.update(included_file._vars)
|
|
|
|
ti_copy.vars = temp_vars
|
|
|
|
return ti_copy
|
|
|
|
def _load_included_file(self, included_file, iterator, is_handler=False):
|
|
'''
|
|
Loads an included YAML file of tasks, applying the optional set of variables.
|
|
'''
|
|
|
|
display.debug("loading included file: %s" % included_file._filename)
|
|
try:
|
|
data = self._loader.load_from_file(included_file._filename)
|
|
if data is None:
|
|
return []
|
|
elif not isinstance(data, list):
|
|
raise AnsibleError("included task files must contain a list of tasks")
|
|
|
|
ti_copy = self._copy_included_file(included_file)
|
|
# pop tags out of the include args, if they were specified there, and assign
|
|
# them to the include. If the include already had tags specified, we raise an
|
|
# error so that users know not to specify them both ways
|
|
tags = included_file._task.vars.pop('tags', [])
|
|
if isinstance(tags, string_types):
|
|
tags = tags.split(',')
|
|
if len(tags) > 0:
|
|
if len(included_file._task.tags) > 0:
|
|
raise AnsibleParserError("Include tasks should not specify tags in more than one way (both via args and directly on the task). "
|
|
"Mixing tag specify styles is prohibited for whole import hierarchy, not only for single import statement",
|
|
obj=included_file._task._ds)
|
|
display.deprecated("You should not specify tags in the include parameters. All tags should be specified using the task-level option",
|
|
version='2.12', collection_name='ansible.builtin')
|
|
included_file._task.tags = tags
|
|
|
|
block_list = load_list_of_blocks(
|
|
data,
|
|
play=iterator._play,
|
|
parent_block=ti_copy.build_parent_block(),
|
|
role=included_file._task._role,
|
|
use_handlers=is_handler,
|
|
loader=self._loader,
|
|
variable_manager=self._variable_manager,
|
|
)
|
|
|
|
# since we skip incrementing the stats when the task result is
|
|
# first processed, we do so now for each host in the list
|
|
for host in included_file._hosts:
|
|
self._tqm._stats.increment('ok', host.name)
|
|
|
|
except AnsibleError as e:
|
|
if isinstance(e, AnsibleFileNotFound):
|
|
reason = "Could not find or access '%s' on the Ansible Controller." % to_text(e.file_name)
|
|
else:
|
|
reason = to_text(e)
|
|
|
|
# mark all of the hosts including this file as failed, send callbacks,
|
|
# and increment the stats for this host
|
|
for host in included_file._hosts:
|
|
tr = TaskResult(host=host, task=included_file._task, return_data=dict(failed=True, reason=reason))
|
|
iterator.mark_host_failed(host)
|
|
self._tqm._failed_hosts[host.name] = True
|
|
self._tqm._stats.increment('failures', host.name)
|
|
self._tqm.send_callback('v2_runner_on_failed', tr)
|
|
return []
|
|
|
|
# finally, send the callback and return the list of blocks loaded
|
|
self._tqm.send_callback('v2_playbook_on_include', included_file)
|
|
display.debug("done processing included file")
|
|
return block_list
|
|
|
|
def run_handlers(self, iterator, play_context):
|
|
'''
|
|
Runs handlers on those hosts which have been notified.
|
|
'''
|
|
|
|
result = self._tqm.RUN_OK
|
|
|
|
for handler_block in iterator._play.handlers:
|
|
# FIXME: handlers need to support the rescue/always portions of blocks too,
|
|
# but this may take some work in the iterator and gets tricky when
|
|
# we consider the ability of meta tasks to flush handlers
|
|
for handler in handler_block.block:
|
|
if handler.notified_hosts:
|
|
result = self._do_handler_run(handler, handler.get_name(), iterator=iterator, play_context=play_context)
|
|
if not result:
|
|
break
|
|
return result
|
|
|
|
def _do_handler_run(self, handler, handler_name, iterator, play_context, notified_hosts=None):
|
|
|
|
# FIXME: need to use iterator.get_failed_hosts() instead?
|
|
# if not len(self.get_hosts_remaining(iterator._play)):
|
|
# self._tqm.send_callback('v2_playbook_on_no_hosts_remaining')
|
|
# result = False
|
|
# break
|
|
if notified_hosts is None:
|
|
notified_hosts = handler.notified_hosts[:]
|
|
|
|
# strategy plugins that filter hosts need access to the iterator to identify failed hosts
|
|
failed_hosts = self._filter_notified_failed_hosts(iterator, notified_hosts)
|
|
notified_hosts = self._filter_notified_hosts(notified_hosts)
|
|
notified_hosts += failed_hosts
|
|
|
|
if len(notified_hosts) > 0:
|
|
self._tqm.send_callback('v2_playbook_on_handler_task_start', handler)
|
|
|
|
bypass_host_loop = False
|
|
try:
|
|
action = plugin_loader.action_loader.get(handler.action, class_only=True, collection_list=handler.collections)
|
|
if getattr(action, 'BYPASS_HOST_LOOP', False):
|
|
bypass_host_loop = True
|
|
except KeyError:
|
|
# we don't care here, because the action may simply not have a
|
|
# corresponding action plugin
|
|
pass
|
|
|
|
host_results = []
|
|
for host in notified_hosts:
|
|
if not iterator.is_failed(host) or iterator._play.force_handlers:
|
|
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=handler,
|
|
_hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all)
|
|
self.add_tqm_variables(task_vars, play=iterator._play)
|
|
templar = Templar(loader=self._loader, variables=task_vars)
|
|
if not handler.cached_name:
|
|
handler.name = templar.template(handler.name)
|
|
handler.cached_name = True
|
|
|
|
self._queue_task(host, handler, task_vars, play_context)
|
|
|
|
if templar.template(handler.run_once) or bypass_host_loop:
|
|
break
|
|
|
|
# collect the results from the handler run
|
|
host_results = self._wait_on_handler_results(iterator, handler, notified_hosts)
|
|
|
|
included_files = IncludedFile.process_include_results(
|
|
host_results,
|
|
iterator=iterator,
|
|
loader=self._loader,
|
|
variable_manager=self._variable_manager
|
|
)
|
|
|
|
result = True
|
|
if len(included_files) > 0:
|
|
for included_file in included_files:
|
|
try:
|
|
new_blocks = self._load_included_file(included_file, iterator=iterator, is_handler=True)
|
|
# for every task in each block brought in by the include, add the list
|
|
# of hosts which included the file to the notified_handlers dict
|
|
for block in new_blocks:
|
|
iterator._play.handlers.append(block)
|
|
for task in block.block:
|
|
task_name = task.get_name()
|
|
display.debug("adding task '%s' included in handler '%s'" % (task_name, handler_name))
|
|
task.notified_hosts = included_file._hosts[:]
|
|
result = self._do_handler_run(
|
|
handler=task,
|
|
handler_name=task_name,
|
|
iterator=iterator,
|
|
play_context=play_context,
|
|
notified_hosts=included_file._hosts[:],
|
|
)
|
|
if not result:
|
|
break
|
|
except AnsibleError as e:
|
|
for host in included_file._hosts:
|
|
iterator.mark_host_failed(host)
|
|
self._tqm._failed_hosts[host.name] = True
|
|
display.warning(to_text(e))
|
|
continue
|
|
|
|
# remove hosts from notification list
|
|
handler.notified_hosts = [
|
|
h for h in handler.notified_hosts
|
|
if h not in notified_hosts]
|
|
display.debug("done running handlers, result is: %s" % result)
|
|
return result
|
|
|
|
def _filter_notified_failed_hosts(self, iterator, notified_hosts):
|
|
return []
|
|
|
|
def _filter_notified_hosts(self, notified_hosts):
|
|
'''
|
|
Filter notified hosts accordingly to strategy
|
|
'''
|
|
|
|
# As main strategy is linear, we do not filter hosts
|
|
# We return a copy to avoid race conditions
|
|
return notified_hosts[:]
|
|
|
|
def _take_step(self, task, host=None):
|
|
|
|
ret = False
|
|
msg = u'Perform task: %s ' % task
|
|
if host:
|
|
msg += u'on %s ' % host
|
|
msg += u'(N)o/(y)es/(c)ontinue: '
|
|
resp = display.prompt(msg)
|
|
|
|
if resp.lower() in ['y', 'yes']:
|
|
display.debug("User ran task")
|
|
ret = True
|
|
elif resp.lower() in ['c', 'continue']:
|
|
display.debug("User ran task and canceled step mode")
|
|
self._step = False
|
|
ret = True
|
|
else:
|
|
display.debug("User skipped task")
|
|
|
|
display.banner(msg)
|
|
|
|
return ret
|
|
|
|
def _cond_not_supported_warn(self, task_name):
|
|
display.warning("%s task does not support when conditional" % task_name)
|
|
|
|
def _execute_meta(self, task, play_context, iterator, target_host):
|
|
|
|
# meta tasks store their args in the _raw_params field of args,
|
|
# since they do not use k=v pairs, so get that
|
|
meta_action = task.args.get('_raw_params')
|
|
|
|
def _evaluate_conditional(h):
|
|
all_vars = self._variable_manager.get_vars(play=iterator._play, host=h, task=task,
|
|
_hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all)
|
|
templar = Templar(loader=self._loader, variables=all_vars)
|
|
return task.evaluate_conditional(templar, all_vars)
|
|
|
|
skipped = False
|
|
msg = ''
|
|
skip_reason = '%s conditional evaluated to False' % meta_action
|
|
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
|
|
|
|
# These don't support "when" conditionals
|
|
if meta_action in ('noop', 'flush_handlers', 'refresh_inventory', 'reset_connection') and task.when:
|
|
self._cond_not_supported_warn(meta_action)
|
|
|
|
if meta_action == 'noop':
|
|
msg = "noop"
|
|
elif meta_action == 'flush_handlers':
|
|
self._flushed_hosts[target_host] = True
|
|
self.run_handlers(iterator, play_context)
|
|
self._flushed_hosts[target_host] = False
|
|
msg = "ran handlers"
|
|
elif meta_action == 'refresh_inventory':
|
|
self._inventory.refresh_inventory()
|
|
self._set_hosts_cache(iterator._play)
|
|
msg = "inventory successfully refreshed"
|
|
elif meta_action == 'clear_facts':
|
|
if _evaluate_conditional(target_host):
|
|
for host in self._inventory.get_hosts(iterator._play.hosts):
|
|
hostname = host.get_name()
|
|
self._variable_manager.clear_facts(hostname)
|
|
msg = "facts cleared"
|
|
else:
|
|
skipped = True
|
|
skip_reason += ', not clearing facts and fact cache for %s' % target_host.name
|
|
elif meta_action == 'clear_host_errors':
|
|
if _evaluate_conditional(target_host):
|
|
for host in self._inventory.get_hosts(iterator._play.hosts):
|
|
self._tqm._failed_hosts.pop(host.name, False)
|
|
self._tqm._unreachable_hosts.pop(host.name, False)
|
|
iterator._host_states[host.name].fail_state = iterator.FAILED_NONE
|
|
msg = "cleared host errors"
|
|
else:
|
|
skipped = True
|
|
skip_reason += ', not clearing host error state for %s' % target_host.name
|
|
elif meta_action == 'end_play':
|
|
if _evaluate_conditional(target_host):
|
|
for host in self._inventory.get_hosts(iterator._play.hosts):
|
|
if host.name not in self._tqm._unreachable_hosts:
|
|
iterator._host_states[host.name].run_state = iterator.ITERATING_COMPLETE
|
|
msg = "ending play"
|
|
else:
|
|
skipped = True
|
|
skip_reason += ', continuing play'
|
|
elif meta_action == 'end_host':
|
|
if _evaluate_conditional(target_host):
|
|
iterator._host_states[target_host.name].run_state = iterator.ITERATING_COMPLETE
|
|
iterator._play._removed_hosts.append(target_host.name)
|
|
msg = "ending play for %s" % target_host.name
|
|
else:
|
|
skipped = True
|
|
skip_reason += ", continuing execution for %s" % target_host.name
|
|
# TODO: Nix msg here? Left for historical reasons, but skip_reason exists now.
|
|
msg = "end_host conditional evaluated to false, continuing execution for %s" % target_host.name
|
|
elif meta_action == 'role_complete':
|
|
# Allow users to use this in a play as reported in https://github.com/ansible/ansible/issues/22286?
|
|
# How would this work with allow_duplicates??
|
|
if task.implicit:
|
|
if target_host.name in task._role._had_task_run:
|
|
task._role._completed[target_host.name] = True
|
|
msg = 'role_complete for %s' % target_host.name
|
|
elif meta_action == 'reset_connection':
|
|
all_vars = self._variable_manager.get_vars(play=iterator._play, host=target_host, task=task,
|
|
_hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all)
|
|
templar = Templar(loader=self._loader, variables=all_vars)
|
|
|
|
# apply the given task's information to the connection info,
|
|
# which may override some fields already set by the play or
|
|
# the options specified on the command line
|
|
play_context = play_context.set_task_and_variable_override(task=task, variables=all_vars, templar=templar)
|
|
|
|
# fields set from the play/task may be based on variables, so we have to
|
|
# do the same kind of post validation step on it here before we use it.
|
|
play_context.post_validate(templar=templar)
|
|
|
|
# now that the play context is finalized, if the remote_addr is not set
|
|
# default to using the host's address field as the remote address
|
|
if not play_context.remote_addr:
|
|
play_context.remote_addr = target_host.address
|
|
|
|
# We also add "magic" variables back into the variables dict to make sure
|
|
# a certain subset of variables exist.
|
|
play_context.update_vars(all_vars)
|
|
|
|
if target_host in self._active_connections:
|
|
connection = Connection(self._active_connections[target_host])
|
|
del self._active_connections[target_host]
|
|
else:
|
|
connection = plugin_loader.connection_loader.get(play_context.connection, play_context, os.devnull)
|
|
play_context.set_attributes_from_plugin(connection)
|
|
|
|
if connection:
|
|
try:
|
|
connection.reset()
|
|
msg = 'reset connection'
|
|
except ConnectionError as e:
|
|
# most likely socket is already closed
|
|
display.debug("got an error while closing persistent connection: %s" % e)
|
|
else:
|
|
msg = 'no connection, nothing to reset'
|
|
else:
|
|
raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds)
|
|
|
|
result = {'msg': msg}
|
|
if skipped:
|
|
result['skipped'] = True
|
|
result['skip_reason'] = skip_reason
|
|
else:
|
|
result['changed'] = False
|
|
|
|
display.vv("META: %s" % msg)
|
|
|
|
res = TaskResult(target_host, task, result)
|
|
if skipped:
|
|
self._tqm.send_callback('v2_runner_on_skipped', res)
|
|
return [res]
|
|
|
|
def get_hosts_left(self, iterator):
|
|
''' returns list of available hosts for this iterator by filtering out unreachables '''
|
|
|
|
hosts_left = []
|
|
for host in self._hosts_cache:
|
|
if host not in self._tqm._unreachable_hosts:
|
|
try:
|
|
hosts_left.append(self._inventory.hosts[host])
|
|
except KeyError:
|
|
hosts_left.append(self._inventory.get_host(host))
|
|
return hosts_left
|
|
|
|
def update_active_connections(self, results):
|
|
''' updates the current active persistent connections '''
|
|
for r in results:
|
|
if 'args' in r._task_fields:
|
|
socket_path = r._task_fields['args'].get('_ansible_socket')
|
|
if socket_path:
|
|
if r._host not in self._active_connections:
|
|
self._active_connections[r._host] = socket_path
|
|
|
|
|
|
class NextAction(object):
|
|
""" The next action after an interpreter's exit. """
|
|
REDO = 1
|
|
CONTINUE = 2
|
|
EXIT = 3
|
|
|
|
def __init__(self, result=EXIT):
|
|
self.result = result
|
|
|
|
|
|
class Debugger(cmd.Cmd):
|
|
prompt_continuous = '> ' # multiple lines
|
|
|
|
def __init__(self, task, host, task_vars, play_context, result, next_action):
|
|
# cmd.Cmd is old-style class
|
|
cmd.Cmd.__init__(self)
|
|
|
|
self.prompt = '[%s] %s (debug)> ' % (host, task)
|
|
self.intro = None
|
|
self.scope = {}
|
|
self.scope['task'] = task
|
|
self.scope['task_vars'] = task_vars
|
|
self.scope['host'] = host
|
|
self.scope['play_context'] = play_context
|
|
self.scope['result'] = result
|
|
self.next_action = next_action
|
|
|
|
def cmdloop(self):
|
|
try:
|
|
cmd.Cmd.cmdloop(self)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
do_h = cmd.Cmd.do_help
|
|
|
|
def do_EOF(self, args):
|
|
"""Quit"""
|
|
return self.do_quit(args)
|
|
|
|
def do_quit(self, args):
|
|
"""Quit"""
|
|
display.display('User interrupted execution')
|
|
self.next_action.result = NextAction.EXIT
|
|
return True
|
|
|
|
do_q = do_quit
|
|
|
|
def do_continue(self, args):
|
|
"""Continue to next result"""
|
|
self.next_action.result = NextAction.CONTINUE
|
|
return True
|
|
|
|
do_c = do_continue
|
|
|
|
def do_redo(self, args):
|
|
"""Schedule task for re-execution. The re-execution may not be the next result"""
|
|
self.next_action.result = NextAction.REDO
|
|
return True
|
|
|
|
do_r = do_redo
|
|
|
|
def do_update_task(self, args):
|
|
"""Recreate the task from ``task._ds``, and template with updated ``task_vars``"""
|
|
templar = Templar(None, variables=self.scope['task_vars'])
|
|
task = self.scope['task']
|
|
task = task.load_data(task._ds)
|
|
task.post_validate(templar)
|
|
self.scope['task'] = task
|
|
|
|
do_u = do_update_task
|
|
|
|
def evaluate(self, args):
|
|
try:
|
|
return eval(args, globals(), self.scope)
|
|
except Exception:
|
|
t, v = sys.exc_info()[:2]
|
|
if isinstance(t, str):
|
|
exc_type_name = t
|
|
else:
|
|
exc_type_name = t.__name__
|
|
display.display('***%s:%s' % (exc_type_name, repr(v)))
|
|
raise
|
|
|
|
def do_pprint(self, args):
|
|
"""Pretty Print"""
|
|
try:
|
|
result = self.evaluate(args)
|
|
display.display(pprint.pformat(result))
|
|
except Exception:
|
|
pass
|
|
|
|
do_p = do_pprint
|
|
|
|
def execute(self, args):
|
|
try:
|
|
code = compile(args + '\n', '<stdin>', 'single')
|
|
exec(code, globals(), self.scope)
|
|
except Exception:
|
|
t, v = sys.exc_info()[:2]
|
|
if isinstance(t, str):
|
|
exc_type_name = t
|
|
else:
|
|
exc_type_name = t.__name__
|
|
display.display('***%s:%s' % (exc_type_name, repr(v)))
|
|
raise
|
|
|
|
def default(self, line):
|
|
try:
|
|
self.execute(line)
|
|
except Exception:
|
|
pass
|