Stash post-validated fields of the task in the TaskResult
This allows us to have a snapshot of the fields, which we can restore on the pre-fork side so as to avoid having to re-template fields.
This commit is contained in:
parent
40235d7b99
commit
3d65482927
5 changed files with 54 additions and 26 deletions
|
@ -115,7 +115,12 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task))
|
display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task))
|
||||||
self._host.vars = dict()
|
self._host.vars = dict()
|
||||||
self._host.groups = []
|
self._host.groups = []
|
||||||
task_result = TaskResult(self._host.name, self._task._uuid, executor_result)
|
task_result = TaskResult(
|
||||||
|
self._host.name,
|
||||||
|
self._task._uuid,
|
||||||
|
executor_result,
|
||||||
|
task_fields=self._task.dump_attrs(),
|
||||||
|
)
|
||||||
|
|
||||||
# put the result on the result queue
|
# put the result on the result queue
|
||||||
display.debug("sending task result")
|
display.debug("sending task result")
|
||||||
|
@ -125,7 +130,12 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
except AnsibleConnectionFailure:
|
except AnsibleConnectionFailure:
|
||||||
self._host.vars = dict()
|
self._host.vars = dict()
|
||||||
self._host.groups = []
|
self._host.groups = []
|
||||||
task_result = TaskResult(self._host.name, self._task._uuid, dict(unreachable=True))
|
task_result = TaskResult(
|
||||||
|
self._host.name,
|
||||||
|
self._task._uuid,
|
||||||
|
dict(unreachable=True),
|
||||||
|
task_fields=self._task.dump_attrs(),
|
||||||
|
)
|
||||||
self._rslt_q.put(task_result, block=False)
|
self._rslt_q.put(task_result, block=False)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -133,7 +143,12 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
try:
|
try:
|
||||||
self._host.vars = dict()
|
self._host.vars = dict()
|
||||||
self._host.groups = []
|
self._host.groups = []
|
||||||
task_result = TaskResult(self._host.name, self._task._uuid, dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''))
|
task_result = TaskResult(
|
||||||
|
self._host.name,
|
||||||
|
self._task._uuid,
|
||||||
|
dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
|
||||||
|
task_fields=self._task.dump_attrs(),
|
||||||
|
)
|
||||||
self._rslt_q.put(task_result, block=False)
|
self._rslt_q.put(task_result, block=False)
|
||||||
except:
|
except:
|
||||||
display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
|
display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
|
||||||
|
|
|
@ -291,7 +291,15 @@ class TaskExecutor:
|
||||||
templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars)
|
templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars)
|
||||||
res['_ansible_item_label'] = templar.template(label)
|
res['_ansible_item_label'] = templar.template(label)
|
||||||
|
|
||||||
self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, res), block=False)
|
self._rslt_q.put(
|
||||||
|
TaskResult(
|
||||||
|
self._host.name,
|
||||||
|
self._task._uuid,
|
||||||
|
res,
|
||||||
|
task_fields=self._task.dump_attrs(),
|
||||||
|
),
|
||||||
|
block=False,
|
||||||
|
)
|
||||||
results.append(res)
|
results.append(res)
|
||||||
del task_vars[loop_var]
|
del task_vars[loop_var]
|
||||||
|
|
||||||
|
@ -565,7 +573,7 @@ class TaskExecutor:
|
||||||
result['_ansible_retry'] = True
|
result['_ansible_retry'] = True
|
||||||
result['retries'] = retries
|
result['retries'] = retries
|
||||||
display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
|
display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
|
||||||
self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result), block=False)
|
self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False)
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
else:
|
else:
|
||||||
if retries > 1:
|
if retries > 1:
|
||||||
|
|
|
@ -28,14 +28,20 @@ class TaskResult:
|
||||||
the result of a given task.
|
the result of a given task.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, host, task, return_data):
|
def __init__(self, host, task, return_data, task_fields=None):
|
||||||
self._host = host
|
self._host = host
|
||||||
self._task = task
|
self._task = task
|
||||||
|
|
||||||
if isinstance(return_data, dict):
|
if isinstance(return_data, dict):
|
||||||
self._result = return_data.copy()
|
self._result = return_data.copy()
|
||||||
else:
|
else:
|
||||||
self._result = DataLoader().load(return_data)
|
self._result = DataLoader().load(return_data)
|
||||||
|
|
||||||
|
if task_fields is None:
|
||||||
|
self._task_fields = dict()
|
||||||
|
else:
|
||||||
|
self._task_fields = task_fields
|
||||||
|
|
||||||
def is_changed(self):
|
def is_changed(self):
|
||||||
return self._check_key('changed')
|
return self._check_key('changed')
|
||||||
|
|
||||||
|
|
|
@ -500,6 +500,15 @@ class Base(with_metaclass(BaseMeta, object)):
|
||||||
|
|
||||||
return [i for i,_ in itertools.groupby(combined) if i is not None]
|
return [i for i,_ in itertools.groupby(combined) if i is not None]
|
||||||
|
|
||||||
|
def dump_attrs(self):
|
||||||
|
'''
|
||||||
|
Dumps all attributes to a dictionary
|
||||||
|
'''
|
||||||
|
attrs = dict()
|
||||||
|
for name in self._valid_attrs.keys():
|
||||||
|
attrs[name] = getattr(self, name)
|
||||||
|
return attrs
|
||||||
|
|
||||||
def serialize(self):
|
def serialize(self):
|
||||||
'''
|
'''
|
||||||
Serializes the object derived from the base object into
|
Serializes the object derived from the base object into
|
||||||
|
@ -509,10 +518,7 @@ class Base(with_metaclass(BaseMeta, object)):
|
||||||
as field attributes.
|
as field attributes.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
repr = dict()
|
repr = self.dump_attrs()
|
||||||
|
|
||||||
for name in self._valid_attrs.keys():
|
|
||||||
repr[name] = getattr(self, name)
|
|
||||||
|
|
||||||
# serialize the uuid field
|
# serialize the uuid field
|
||||||
repr['uuid'] = self._uuid
|
repr['uuid'] = self._uuid
|
||||||
|
|
|
@ -304,11 +304,6 @@ class StrategyBase:
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# a Templar class to use for templating things later, as we're using
|
|
||||||
# original/non-validated objects here on the manager side. We set the
|
|
||||||
# variables in use later inside the loop below
|
|
||||||
templar = Templar(loader=self._loader)
|
|
||||||
|
|
||||||
cur_pass = 0
|
cur_pass = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
@ -321,7 +316,11 @@ class StrategyBase:
|
||||||
|
|
||||||
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
|
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
|
||||||
original_host = get_original_host(task_result._host)
|
original_host = get_original_host(task_result._host)
|
||||||
original_task = iterator.get_original_task(original_host, task_result._task)
|
found_task = iterator.get_original_task(original_host, task_result._task)
|
||||||
|
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
|
||||||
|
original_task._parent = found_task._parent
|
||||||
|
for (attr, val) in iteritems(task_result._task_fields):
|
||||||
|
setattr(original_task, attr, val)
|
||||||
|
|
||||||
task_result._host = original_host
|
task_result._host = original_host
|
||||||
task_result._task = original_task
|
task_result._task = original_task
|
||||||
|
@ -348,12 +347,6 @@ class StrategyBase:
|
||||||
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
|
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# get the vars for this task/host pair, make them the active set of vars for our templar above
|
|
||||||
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=original_host, task=original_task)
|
|
||||||
self.add_tqm_variables(task_vars, play=iterator._play)
|
|
||||||
templar.set_available_variables(task_vars)
|
|
||||||
|
|
||||||
run_once = templar.template(original_task.run_once)
|
|
||||||
if original_task.register:
|
if original_task.register:
|
||||||
host_list = self.get_task_hosts(iterator, original_host, original_task)
|
host_list = self.get_task_hosts(iterator, original_host, original_task)
|
||||||
|
|
||||||
|
@ -368,10 +361,10 @@ class StrategyBase:
|
||||||
role_ran = False
|
role_ran = False
|
||||||
if task_result.is_failed():
|
if task_result.is_failed():
|
||||||
role_ran = True
|
role_ran = True
|
||||||
ignore_errors = templar.template(original_task.ignore_errors)
|
ignore_errors = original_task.ignore_errors
|
||||||
if not ignore_errors:
|
if not ignore_errors:
|
||||||
display.debug("marking %s as failed" % original_host.name)
|
display.debug("marking %s as failed" % original_host.name)
|
||||||
if run_once:
|
if original_task.run_once:
|
||||||
# if we're using run_once, we have to fail every host here
|
# if we're using run_once, we have to fail every host here
|
||||||
for h in self._inventory.get_hosts(iterator._play.hosts):
|
for h in self._inventory.get_hosts(iterator._play.hosts):
|
||||||
if h.name not in self._tqm._unreachable_hosts:
|
if h.name not in self._tqm._unreachable_hosts:
|
||||||
|
@ -488,7 +481,7 @@ class StrategyBase:
|
||||||
item = result_item.get(loop_var, None)
|
item = result_item.get(loop_var, None)
|
||||||
if item is not None:
|
if item is not None:
|
||||||
task_vars[loop_var] = item
|
task_vars[loop_var] = item
|
||||||
host_name = templar.template(original_task.delegate_to)
|
host_name = original_task.delegate_to
|
||||||
actual_host = self._inventory.get_host(host_name)
|
actual_host = self._inventory.get_host(host_name)
|
||||||
if actual_host is None:
|
if actual_host is None:
|
||||||
actual_host = Host(name=host_name)
|
actual_host = Host(name=host_name)
|
||||||
|
|
Loading…
Reference in a new issue