issue callbacks per item and retry fails

- now workers passes queue to task_executor so it can send back events per item and on retry attempt
- updated result class to pass along events to strategy
- base strategy updated to forward new events to callback
- callbacks now remove 'items' on final result but process them directly when invoked per item
- new callback method to deal with retry attempt messages (also now obeys nolog)
- updated tests to match new signature of task_executor

fixes #14558
fixes #14072
This commit is contained in:
James Cammarata 2016-02-23 15:07:06 -05:00
parent 6eb4633b07
commit e02b98274b
7 changed files with 74 additions and 45 deletions

View file

@ -104,6 +104,19 @@ class ResultProcess(multiprocessing.Process):
time.sleep(0.0001)
continue
# send callbacks for 'non final' results
if '_ansible_retry' in result._result:
self._send_result(('v2_playbook_retry', result))
continue
elif '_ansible_item_result' in result._result:
if result.is_failed() or result.is_unreachable():
self._send_result(('v2_playbook_item_on_failed', result))
elif result.is_skipped():
self._send_result(('v2_playbook_item_on_skipped', result))
else:
self._send_result(('v2_playbook_item_on_ok', result))
continue
clean_copy = strip_internal_keys(result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']

View file

@ -113,6 +113,7 @@ class WorkerProcess(multiprocessing.Process):
self._new_stdin,
self._loader,
self._shared_loader_obj,
self._rslt_q
).run()
debug("done running TaskExecutor() for %s/%s" % (self._host, self._task))

View file

@ -30,6 +30,7 @@ from ansible.compat.six import iteritems, string_types
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable, AnsibleConnectionFailure
from ansible.executor.task_result import TaskResult
from ansible.playbook.conditional import Conditional
from ansible.playbook.task import Task
from ansible.template import Templar
@ -60,7 +61,7 @@ class TaskExecutor:
# the module
SQUASH_ACTIONS = frozenset(C.DEFAULT_SQUASH_ACTIONS)
def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj):
def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, rslt_q):
self._host = host
self._task = task
self._job_vars = job_vars
@ -69,6 +70,7 @@ class TaskExecutor:
self._loader = loader
self._shared_loader_obj = shared_loader_obj
self._connection = None
self._rslt_q = rslt_q
def run(self):
'''
@ -242,7 +244,9 @@ class TaskExecutor:
# now update the result with the item info, and append the result
# to the list of results
res['item'] = item
#TODO: send item results to callback here, instead of all at the end
res['_ansible_item_result'] = True
self._rslt_q.put(TaskResult(self._host, self._task, res), block=False)
results.append(res)
return results
@ -416,6 +420,9 @@ class TaskExecutor:
return dict(unreachable=True, msg=to_unicode(e))
display.debug("handler run complete")
# preserve no log
result["_ansible_no_log"] = self._play_context.no_log
# update the local copy of vars with the registered value, if specified,
# or any facts which may have been generated by the module execution
if self._task.register:
@ -465,16 +472,18 @@ class TaskExecutor:
_evaluate_failed_when_result(result)
if attempt < retries - 1:
if retries > 1:
result['attempts'] = attempt + 1
cond = Conditional(loader=self._loader)
cond.when = [ self._task.until ]
if cond.evaluate_conditional(templar, vars_copy):
break
# no conditional check, or it failed, so sleep for the specified time
display.display("FAILED - RETRYING: %s (%d retries left). Result was: %s" % (self._task, retries-(attempt+1), result), color=C.COLOR_DEBUG)
time.sleep(delay)
else:
# no conditional check, or it failed, so sleep for the specified time
result['attempts'] = attempt + 1
result['retries'] = retries
result['_ansible_retry'] = True
display.debug('Retrying task, attempt %d of %d' % (attempt + 1, retries))
self._rslt_q.put(TaskResult(self._host, self._task, result), block=False)
time.sleep(delay)
else:
if retries > 1:
# we ran out of attempts, so mark the result as failed
@ -506,9 +515,6 @@ class TaskExecutor:
for k in ('ansible_host', ):
result["_ansible_delegated_vars"][k] = delegated_vars.get(k)
# preserve no_log setting
result["_ansible_no_log"] = self._play_context.no_log
# and return
display.debug("attempt loop complete, returning result")
return result

View file

@ -172,16 +172,8 @@ class CallbackBase:
return item
def _process_items(self, result):
for res in result._result['results']:
newres = self._copy_result_exclude(result, ['_result'])
res['item'] = self._get_item(res)
newres._result = res
if 'failed' in res and res['failed']:
self.v2_playbook_item_on_failed(newres)
elif 'skipped' in res and res['skipped']:
self.v2_playbook_item_on_skipped(newres)
else:
self.v2_playbook_item_on_ok(newres)
# just remove them as now they get handled by individual callbacks
del result._result['results']
def _clean_results(self, result, task_name):
if 'changed' in result and task_name in ['debug']:
@ -346,15 +338,6 @@ class CallbackBase:
if 'diff' in result._result:
self.on_file_diff(host, result._result['diff'])
def v2_playbook_on_item_ok(self, result):
pass # no v1
def v2_playbook_on_item_failed(self, result):
pass # no v1
def v2_playbook_on_item_skipped(self, result):
pass # no v1
def v2_playbook_on_include(self, included_file):
pass #no v1 correspondance
@ -366,3 +349,7 @@ class CallbackBase:
def v2_playbook_item_on_skipped(self, result):
pass
def v2_playbook_retry(self, result):
pass

View file

@ -51,6 +51,7 @@ class CallbackModule(CallbackBase):
if result._task.loop and 'results' in result._result:
self._process_items(result)
else:
if delegated_vars:
self._display.display("fatal: [%s -> %s]: FAILED! => %s" % (result._host.get_name(), delegated_vars['ansible_host'], self._dump_results(result._result)), color=C.COLOR_ERROR)
@ -159,24 +160,22 @@ class CallbackModule(CallbackBase):
self._display.display(diff)
def v2_playbook_item_on_ok(self, result):
delegated_vars = result._result.get('_ansible_delegated_vars', None)
if result._task.action == 'include':
return
elif result._result.get('changed', False):
if delegated_vars:
msg = "changed: [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])
else:
msg = "changed: [%s]" % result._host.get_name()
msg = 'changed'
color = C.COLOR_CHANGED
else:
if delegated_vars:
msg = "ok: [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])
else:
msg = "ok: [%s]" % result._host.get_name()
msg = 'ok'
color = C.COLOR_OK
msg += " => (item=%s)" % (result._result['item'],)
if delegated_vars:
msg += ": [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])
else:
msg += ": [%s]" % result._host.get_name()
msg += " => (item=%s)" % (self._get_item(result._result))
if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:
msg += " => %s" % self._dump_results(result._result)
@ -197,15 +196,17 @@ class CallbackModule(CallbackBase):
# finally, remove the exception from the result so it's not shown every time
del result._result['exception']
msg = "failed: "
if delegated_vars:
self._display.display("failed: [%s -> %s] => (item=%s) => %s" % (result._host.get_name(), delegated_vars['ansible_host'], result._result['item'], self._dump_results(result._result)), color=C.COLOR_ERROR)
msg += "[%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host'])
else:
self._display.display("failed: [%s] => (item=%s) => %s" % (result._host.get_name(), result._result['item'], self._dump_results(result._result)), color=C.COLOR_ERROR)
msg += "[%s]" % (result._host.get_name())
self._display.display(msg + " (item=%s) => %s" % (self._get_item(result._result), self._dump_results(result._result)), color=C.COLOR_ERROR)
self._handle_warnings(result._result)
def v2_playbook_item_on_skipped(self, result):
msg = "skipping: [%s] => (item=%s) " % (result._host.get_name(), result._result['item'])
msg = "skipping: [%s] => (item=%s) " % (result._host.get_name(), self._get_item(result._result))
if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:
msg += " => %s" % self._dump_results(result._result)
self._display.display(msg, color=C.COLOR_SKIP)
@ -254,3 +255,9 @@ class CallbackModule(CallbackBase):
val = getattr(self._options,option)
if val:
self._display.vvvv('%s: %s' % (option,val))
def v2_playbook_retry(self, result):
msg = "FAILED - RETRYING: %s (%d retries left)." % (result._task, result._result['retries'] - result._result['attempts'])
if (self._display.verbosity > 2 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result:
msg += "Result was: %s" % self._dump_results(result._result)
self._display.display(msg, color=C.COLOR_DEBUG)

View file

@ -329,7 +329,8 @@ class StrategyBase:
self._variable_manager.set_nonpersistent_facts(target_host, facts)
else:
self._variable_manager.set_host_facts(target_host, facts)
elif result[0].startswith('v2_playbook_item') or result[0] == 'v2_playbook_retry':
self._tqm.send_callback(result[0], result[1])
else:
raise AnsibleError("unknown result message received: %s" % result[0])

View file

@ -45,6 +45,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_shared_loader = MagicMock()
new_stdin = None
job_vars = dict()
mock_queue = MagicMock()
te = TaskExecutor(
host = mock_host,
task = mock_task,
@ -53,6 +54,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = new_stdin,
loader = fake_loader,
shared_loader_obj = mock_shared_loader,
rslt_q = mock_queue,
)
def test_task_executor_run(self):
@ -66,6 +68,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_play_context = MagicMock()
mock_shared_loader = MagicMock()
mock_queue = MagicMock()
new_stdin = None
job_vars = dict()
@ -78,6 +81,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = new_stdin,
loader = fake_loader,
shared_loader_obj = mock_shared_loader,
rslt_q = mock_queue,
)
te._get_loop_items = MagicMock(return_value=None)
@ -111,6 +115,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = None
job_vars = dict()
mock_queue = MagicMock()
te = TaskExecutor(
host = mock_host,
@ -120,6 +125,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = new_stdin,
loader = fake_loader,
shared_loader_obj = mock_shared_loader,
rslt_q = mock_queue,
)
items = te._get_loop_items()
@ -142,6 +148,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_play_context = MagicMock()
mock_shared_loader = MagicMock()
mock_queue = MagicMock()
new_stdin = None
job_vars = dict()
@ -154,6 +161,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = new_stdin,
loader = fake_loader,
shared_loader_obj = mock_shared_loader,
rslt_q = mock_queue,
)
def _execute(variables):
@ -184,6 +192,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_play_context = MagicMock()
mock_shared_loader = None
mock_queue = MagicMock()
new_stdin = None
job_vars = dict(pkg_mgr='yum')
@ -196,6 +205,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = new_stdin,
loader = fake_loader,
shared_loader_obj = mock_shared_loader,
rslt_q = mock_queue,
)
#
@ -279,6 +289,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_connection._connect.return_value = None
mock_action = MagicMock()
mock_queue = MagicMock()
shared_loader = None
new_stdin = None
@ -292,6 +303,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = new_stdin,
loader = fake_loader,
shared_loader_obj = shared_loader,
rslt_q = mock_queue,
)
te._get_connection = MagicMock(return_value=mock_connection)
@ -330,6 +342,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_connection = MagicMock()
mock_action = MagicMock()
mock_queue = MagicMock()
shared_loader = MagicMock()
shared_loader.action_loader = action_loader
@ -345,6 +358,7 @@ class TestTaskExecutor(unittest.TestCase):
new_stdin = new_stdin,
loader = fake_loader,
shared_loader_obj = shared_loader,
rslt_q = mock_queue,
)
te._connection = MagicMock()