diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py index 038a68fbef7..75d6da376ab 100644 --- a/lib/ansible/executor/process/result.py +++ b/lib/ansible/executor/process/result.py @@ -72,9 +72,9 @@ class ResultProcess(multiprocessing.Process): self._cur_worker = 0 try: - if not rslt_q.empty(): + if rslt_q.qsize() > 0: debug("worker %d has data to read" % self._cur_worker) - result = rslt_q.get(block=False) + result = rslt_q.get() debug("got a result from worker %d: %s" % (self._cur_worker, result)) break except queue.Empty: @@ -102,7 +102,7 @@ class ResultProcess(multiprocessing.Process): try: result = self._read_worker_result() if result is None: - time.sleep(0.1) + time.sleep(0.01) continue # if this task is registering a result, do it now diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 595cf872e71..d73434652a6 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -92,40 +92,36 @@ class WorkerProcess(multiprocessing.Process): while True: task = None try: - if not self._main_q.empty(): - debug("there's work to be done!") - (host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get(block=False) - debug("got a task/handler to work on: %s" % task) + (host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get() + debug("there's work to be done!") + debug("got a task/handler to work on: %s" % task) - # because the task queue manager starts workers (forks) before the - # playbook is loaded, set the basedir of the loader inherted by - # this fork now so that we can find files correctly - self._loader.set_basedir(basedir) + # because the task queue manager starts workers (forks) before the + # playbook is loaded, set the basedir of the loader inherted by + # this fork now so that we can find files correctly + self._loader.set_basedir(basedir) - # Serializing/deserializing tasks does not preserve the loader attribute, - # since it is passed to the worker during the forking of the process and - # would be wasteful to serialize. So we set it here on the task now, and - # the task handles updating parent/child objects as needed. - task.set_loader(self._loader) + # Serializing/deserializing tasks does not preserve the loader attribute, + # since it is passed to the worker during the forking of the process and + # would be wasteful to serialize. So we set it here on the task now, and + # the task handles updating parent/child objects as needed. + task.set_loader(self._loader) - # apply the given task's information to the connection info, - # which may override some fields already set by the play or - # the options specified on the command line - new_play_context = play_context.set_task_and_variable_override(task=task, variables=job_vars) + # apply the given task's information to the connection info, + # which may override some fields already set by the play or + # the options specified on the command line + new_play_context = play_context.set_task_and_variable_override(task=task, variables=job_vars) - # execute the task and build a TaskResult from the result - debug("running TaskExecutor() for %s/%s" % (host, task)) - executor_result = TaskExecutor(host, task, job_vars, new_play_context, self._new_stdin, self._loader, shared_loader_obj).run() - debug("done running TaskExecutor() for %s/%s" % (host, task)) - task_result = TaskResult(host, task, executor_result) + # execute the task and build a TaskResult from the result + debug("running TaskExecutor() for %s/%s" % (host, task)) + executor_result = TaskExecutor(host, task, job_vars, new_play_context, self._new_stdin, self._loader, shared_loader_obj).run() + debug("done running TaskExecutor() for %s/%s" % (host, task)) + task_result = TaskResult(host, task, executor_result) - # put the result on the result queue - debug("sending task result") - self._rslt_q.put(task_result, block=False) - debug("done sending task result") - - else: - time.sleep(0.1) + # put the result on the result queue + debug("sending task result") + self._rslt_q.put(task_result) + debug("done sending task result") except queue.Empty: pass diff --git a/lib/ansible/playbook/block.py b/lib/ansible/playbook/block.py index c6afe5ba96c..006ac828fe6 100644 --- a/lib/ansible/playbook/block.py +++ b/lib/ansible/playbook/block.py @@ -37,12 +37,13 @@ class Block(Base, Become, Conditional, Taggable): # similar to the 'else' clause for exceptions #_otherwise = FieldAttribute(isa='list') - def __init__(self, play=None, parent_block=None, role=None, task_include=None, use_handlers=False): + def __init__(self, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, implicit=False): self._play = play self._role = role self._task_include = task_include self._parent_block = parent_block self._use_handlers = use_handlers + self._implicit = implicit self._dep_chain = [] super(Block, self).__init__() @@ -66,22 +67,27 @@ class Block(Base, Become, Conditional, Taggable): @staticmethod def load(data, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None): - b = Block(play=play, parent_block=parent_block, role=role, task_include=task_include, use_handlers=use_handlers) + implicit = not Block.is_block(data) + b = Block(play=play, parent_block=parent_block, role=role, task_include=task_include, use_handlers=use_handlers, implicit=implicit) return b.load_data(data, variable_manager=variable_manager, loader=loader) + @staticmethod + def is_block(ds): + is_block = False + if isinstance(ds, dict): + for attr in ('block', 'rescue', 'always'): + if attr in ds: + is_block = True + break + return is_block + def preprocess_data(self, ds): ''' If a simple task is given, an implicit block for that single task is created, which goes in the main portion of the block ''' - is_block = False - for attr in ('block', 'rescue', 'always'): - if attr in ds: - is_block = True - break - - if not is_block: + if not Block.is_block(ds): if isinstance(ds, list): return super(Block, self).preprocess_data(dict(block=ds)) else: diff --git a/lib/ansible/playbook/helpers.py b/lib/ansible/playbook/helpers.py index ca9326b8141..98bef15e2a8 100644 --- a/lib/ansible/playbook/helpers.py +++ b/lib/ansible/playbook/helpers.py @@ -52,7 +52,13 @@ def load_list_of_blocks(ds, play, parent_block=None, role=None, task_include=Non variable_manager=variable_manager, loader=loader ) - block_list.append(b) + # Implicit blocks are created by bare tasks listed in a play withou + # an explicit block statement. If we have two implicit blocks in a row, + # squash them down to a single block to save processing time later. + if b._implicit and len(block_list) > 0 and block_list[-1]._implicit: + block_list[-1].block.extend(b.block) + else: + block_list.append(b) return block_list diff --git a/lib/ansible/vars/hostvars.py b/lib/ansible/vars/hostvars.py index 766efb5ed3b..af3d086ae8c 100644 --- a/lib/ansible/vars/hostvars.py +++ b/lib/ansible/vars/hostvars.py @@ -54,8 +54,15 @@ class HostVars(collections.Mapping): if item and item is not j2undefined: return True return False + def __iter__(self): raise NotImplementedError('HostVars does not support iteration as hosts are discovered on an as needed basis.') def __len__(self): raise NotImplementedError('HostVars does not support len. hosts entries are discovered dynamically as needed') + + def __getstate__(self): + return self._lookup + + def __setstate__(self, data): + self._lookup = data