From 4af2d0a9079c8657ca2044337c0b07196915771d Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Thu, 26 Feb 2015 09:51:12 -0600 Subject: [PATCH] Reworking v2 play iterator and fixing some other bugs Still not working quite right: * dynamic includes are not adding the included tasks yet * running roles with tags not quite working right --- v2/ansible/executor/manager.py | 66 --- v2/ansible/executor/play_iterator.py | 414 +++++++----------- v2/ansible/executor/process/result.py | 13 +- v2/ansible/executor/task_executor.py | 5 +- v2/ansible/executor/task_queue_manager.py | 6 - v2/ansible/playbook/base.py | 2 +- v2/ansible/playbook/block.py | 21 +- v2/ansible/playbook/play.py | 18 +- v2/ansible/playbook/role/__init__.py | 16 +- v2/ansible/playbook/task.py | 13 - v2/ansible/plugins/strategies/__init__.py | 59 +-- v2/ansible/plugins/strategies/linear.py | 140 ++++-- v2/samples/include.yml | 4 + v2/samples/localhosts | 3 + v2/samples/roles/common/meta/main.yml | 0 v2/samples/roles/common/tasks/main.yml | 1 + v2/samples/roles/role_a/meta/main.yml | 2 + v2/samples/roles/role_a/tasks/main.yml | 1 + v2/samples/roles/role_b/meta/main.yml | 2 + v2/samples/roles/role_b/tasks/main.yml | 1 + v2/samples/roles/test_role/meta/main.yml | 2 + v2/samples/roles/test_role/tasks/main.yml | 2 + v2/samples/roles/test_role_dep/tasks/main.yml | 1 + v2/samples/test_block.yml | 13 +- v2/samples/test_blocks_of_blocks.yml | 8 + v2/samples/test_include.yml | 26 ++ v2/samples/test_role.yml | 3 +- v2/samples/test_roles_complex.yml | 6 + 28 files changed, 430 insertions(+), 418 deletions(-) delete mode 100644 v2/ansible/executor/manager.py create mode 100644 v2/samples/include.yml create mode 100644 v2/samples/localhosts create mode 100644 v2/samples/roles/common/meta/main.yml create mode 100644 v2/samples/roles/common/tasks/main.yml create mode 100644 v2/samples/roles/role_a/meta/main.yml create mode 100644 v2/samples/roles/role_a/tasks/main.yml create mode 100644 v2/samples/roles/role_b/meta/main.yml create mode 100644 v2/samples/roles/role_b/tasks/main.yml create mode 100644 v2/samples/roles/test_role/meta/main.yml create mode 100644 v2/samples/roles/test_role_dep/tasks/main.yml create mode 100644 v2/samples/test_blocks_of_blocks.yml create mode 100644 v2/samples/test_include.yml create mode 100644 v2/samples/test_roles_complex.yml diff --git a/v2/ansible/executor/manager.py b/v2/ansible/executor/manager.py deleted file mode 100644 index 33a76e143b9..00000000000 --- a/v2/ansible/executor/manager.py +++ /dev/null @@ -1,66 +0,0 @@ -# (c) 2012-2014, Michael DeHaan -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from multiprocessing.managers import SyncManager, BaseProxy -from ansible.playbook.handler import Handler -from ansible.playbook.task import Task -from ansible.playbook.play import Play -from ansible.errors import AnsibleError - -__all__ = ['AnsibleManager'] - - -class VariableManagerWrapper: - ''' - This class simply acts as a wrapper around the VariableManager class, - since manager proxies expect a new object to be returned rather than - any existing one. Using this wrapper, a shared proxy can be created - and an existing VariableManager class assigned to it, which can then - be accessed through the exposed proxy methods. - ''' - - def __init__(self): - self._vm = None - - def get_vars(self, loader, play=None, host=None, task=None): - return self._vm.get_vars(loader=loader, play=play, host=host, task=task) - - def set_variable_manager(self, vm): - self._vm = vm - - def set_host_variable(self, host, varname, value): - self._vm.set_host_variable(host, varname, value) - - def set_host_facts(self, host, facts): - self._vm.set_host_facts(host, facts) - -class AnsibleManager(SyncManager): - ''' - This is our custom manager class, which exists only so we may register - the new proxy below - ''' - pass - -AnsibleManager.register( - typeid='VariableManagerWrapper', - callable=VariableManagerWrapper, -) - diff --git a/v2/ansible/executor/play_iterator.py b/v2/ansible/executor/play_iterator.py index 2339ce1d45a..8936a216bee 100644 --- a/v2/ansible/executor/play_iterator.py +++ b/v2/ansible/executor/play_iterator.py @@ -26,278 +26,202 @@ from ansible.utils.boolean import boolean __all__ = ['PlayIterator'] +class HostState: + def __init__(self, blocks): + self._blocks = blocks[:] -# the primary running states for the play iteration -ITERATING_SETUP = 0 -ITERATING_TASKS = 1 -ITERATING_RESCUE = 2 -ITERATING_ALWAYS = 3 -ITERATING_COMPLETE = 4 + self.cur_block = 0 + self.cur_regular_task = 0 + self.cur_rescue_task = 0 + self.cur_always_task = 0 + self.cur_role = None + self.run_state = PlayIterator.ITERATING_SETUP + self.fail_state = PlayIterator.FAILED_NONE + self.pending_setup = False -# the failure states for the play iteration -FAILED_NONE = 0 -FAILED_SETUP = 1 -FAILED_TASKS = 2 -FAILED_RESCUE = 3 -FAILED_ALWAYS = 4 + def get_current_block(self): + return self._blocks[self.cur_block] -class PlayState: - - ''' - A helper class, which keeps track of the task iteration - state for a given playbook. This is used in the PlaybookIterator - class on a per-host basis. - ''' - - # FIXME: this class is the representation of a finite state machine, - # so we really should have a well defined state representation - # documented somewhere... - - def __init__(self, parent_iterator, host): - ''' - Create the initial state, which tracks the running state as well - as the failure state, which are used when executing block branches - (rescue/always) - ''' - - self._parent_iterator = parent_iterator - self._run_state = ITERATING_SETUP - self._failed_state = FAILED_NONE - self._gather_facts = parent_iterator._play.gather_facts - #self._task_list = parent_iterator._play.compile() - self._task_list = parent_iterator._task_list[:] - self._host = host - - self._cur_block = None - self._cur_role = None - self._cur_task_pos = 0 - self._cur_rescue_pos = 0 - self._cur_always_pos = 0 - self._cur_handler_pos = 0 - - def next(self, peek=False): - ''' - Determines and returns the next available task from the playbook, - advancing through the list of plays as it goes. If peek is set to True, - the internal state is not stored. - ''' - - task = None - - # save this locally so that we can peek at the next task - # without updating the internal state of the iterator - run_state = self._run_state - failed_state = self._failed_state - cur_block = self._cur_block - cur_role = self._cur_role - cur_task_pos = self._cur_task_pos - cur_rescue_pos = self._cur_rescue_pos - cur_always_pos = self._cur_always_pos - cur_handler_pos = self._cur_handler_pos - - - while True: - if run_state == ITERATING_SETUP: - if failed_state == FAILED_SETUP: - run_state = ITERATING_COMPLETE - else: - run_state = ITERATING_TASKS - - if self._gather_facts == 'smart' and not self._host.gathered_facts or boolean(self._gather_facts): - self._host.set_gathered_facts(True) - task = Task() - # FIXME: this is not the best way to get this... - task.set_loader(self._parent_iterator._play._loader) - task.action = 'setup' - break - elif run_state == ITERATING_TASKS: - # if there is any failure state besides FAILED_NONE, we should - # change to some other running state - if failed_state != FAILED_NONE or cur_task_pos > len(self._task_list) - 1: - # if there is a block (and there always should be), start running - # the rescue portion if it exists (and if we haven't failed that - # already), or the always portion (if it exists and we didn't fail - # there too). Otherwise, we're done iterating. - if cur_block: - if failed_state != FAILED_RESCUE and cur_block.rescue: - run_state = ITERATING_RESCUE - cur_rescue_pos = 0 - elif failed_state != FAILED_ALWAYS and cur_block.always: - run_state = ITERATING_ALWAYS - cur_always_pos = 0 - else: - run_state = ITERATING_COMPLETE - else: - run_state = ITERATING_COMPLETE - else: - task = self._task_list[cur_task_pos] - if cur_block is not None and cur_block != task._block: - run_state = ITERATING_ALWAYS - continue - else: - cur_block = task._block - cur_task_pos += 1 - - # Break out of the while loop now that we have our task - break - - elif run_state == ITERATING_RESCUE: - # If we're iterating through the rescue tasks, make sure we haven't - # failed yet. If so, move on to the always block or if not get the - # next rescue task (if one exists) - if failed_state == FAILED_RESCUE or cur_block.rescue is None or cur_rescue_pos > len(cur_block.rescue) - 1: - run_state = ITERATING_ALWAYS - else: - task = cur_block.rescue[cur_rescue_pos] - cur_rescue_pos += 1 - break - - elif run_state == ITERATING_ALWAYS: - # If we're iterating through the always tasks, make sure we haven't - # failed yet. If so, we're done iterating otherwise get the next always - # task (if one exists) - if failed_state == FAILED_ALWAYS or cur_block.always is None or cur_always_pos > len(cur_block.always) - 1: - cur_block = None - if failed_state == FAILED_ALWAYS or cur_task_pos > len(self._task_list) - 1: - run_state = ITERATING_COMPLETE - else: - run_state = ITERATING_TASKS - else: - task = cur_block.always[cur_always_pos] - cur_always_pos += 1 - break - - elif run_state == ITERATING_COMPLETE: - # done iterating, return None to signify that - return None - - if task._role: - # if we had a current role, mark that role as completed - if cur_role and task._role != cur_role and not peek: - cur_role._completed = True - - cur_role = task._role - - # if the current role has not had its task run flag set, mark - # clear the completed flag so we can correctly determine if the - # role was run - if not cur_role._had_task_run and not peek: - cur_role._completed = False - - # If we're not just peeking at the next task, save the internal state - if not peek: - self._run_state = run_state - self._failed_state = failed_state - self._cur_block = cur_block - self._cur_role = cur_role - self._cur_task_pos = cur_task_pos - self._cur_rescue_pos = cur_rescue_pos - self._cur_always_pos = cur_always_pos - self._cur_handler_pos = cur_handler_pos - - return task - - def mark_failed(self): - ''' - Escalates the failed state relative to the running state. - ''' - if self._run_state == ITERATING_SETUP: - self._failed_state = FAILED_SETUP - elif self._run_state == ITERATING_TASKS: - self._failed_state = FAILED_TASKS - elif self._run_state == ITERATING_RESCUE: - self._failed_state = FAILED_RESCUE - elif self._run_state == ITERATING_ALWAYS: - self._failed_state = FAILED_ALWAYS - - def add_tasks(self, task_list): - if self._run_state == ITERATING_TASKS: - before = self._task_list[:self._cur_task_pos] - after = self._task_list[self._cur_task_pos:] - self._task_list = before + task_list + after - elif self._run_state == ITERATING_RESCUE: - before = self._cur_block.rescue[:self._cur_rescue_pos] - after = self._cur_block.rescue[self._cur_rescue_pos:] - self._cur_block.rescue = before + task_list + after - elif self._run_state == ITERATING_ALWAYS: - before = self._cur_block.always[:self._cur_always_pos] - after = self._cur_block.always[self._cur_always_pos:] - self._cur_block.always = before + task_list + after + def copy(self): + new_state = HostState(self._blocks) + new_state.cur_block = self.cur_block + new_state.cur_regular_task = self.cur_regular_task + new_state.cur_rescue_task = self.cur_rescue_task + new_state.cur_always_task = self.cur_always_task + new_state.cur_role = self.cur_role + new_state.run_state = self.run_state + new_state.fail_state = self.fail_state + new_state.pending_setup = self.pending_setup + return new_state class PlayIterator: + # the primary running states for the play iteration + ITERATING_SETUP = 0 + ITERATING_TASKS = 1 + ITERATING_RESCUE = 2 + ITERATING_ALWAYS = 3 + ITERATING_COMPLETE = 4 - ''' - The main iterator class, which keeps the state of the playbook - on a per-host basis using the above PlaybookState class. - ''' + # the failure states for the play iteration, which are powers + # of 2 as they may be or'ed together in certain circumstances + FAILED_NONE = 0 + FAILED_SETUP = 1 + FAILED_TASKS = 2 + FAILED_RESCUE = 4 + FAILED_ALWAYS = 8 def __init__(self, inventory, play): - self._play = play - self._inventory = inventory - self._host_entries = dict() - self._first_host = None + # FIXME: should we save the post_validated play from below here instead? + self._play = play - # Build the per-host dictionary of playbook states, using a copy - # of the play object so we can post_validate it to ensure any templated - # fields are filled in without modifying the original object, since - # post_validate() saves the templated values. - - # FIXME: this is a hacky way of doing this, the iterator should - # instead get the loader and variable manager directly - # as args to __init__ + # post validate the play, as we need some fields to be finalized now + # so that we can use them to setup the iterator properly all_vars = inventory._variable_manager.get_vars(loader=inventory._loader, play=play) new_play = play.copy() new_play.post_validate(all_vars, fail_on_undefined=False) - self._task_list = new_play.compile() + self._blocks = new_play.compile() + self._host_states = {} for host in inventory.get_hosts(new_play.hosts): - if self._first_host is None: - self._first_host = host - self._host_entries[host.get_name()] = PlayState(parent_iterator=self, host=host) + self._host_states[host.name] = HostState(blocks=self._blocks) - # FIXME: remove, probably not required anymore - #def get_next_task(self, peek=False): - # ''' returns the next task for host[0] ''' - # - # first_entry = self._host_entries[self._first_host.get_name()] - # if not peek: - # for entry in self._host_entries: - # if entry != self._first_host.get_name(): - # target_entry = self._host_entries[entry] - # if target_entry._cur_task_pos == first_entry._cur_task_pos: - # target_entry.next() - # return first_entry.next(peek=peek) - - def get_next_task_for_host(self, host, peek=False): - ''' fetch the next task for the given host ''' - if host.get_name() not in self._host_entries: + def get_host_state(self, host): + try: + return self._host_states[host.name].copy() + except KeyError: raise AnsibleError("invalid host (%s) specified for playbook iteration" % host) - return self._host_entries[host.get_name()].next(peek=peek) + def get_next_task_for_host(self, host, peek=False, lock_step=True): + s = self.get_host_state(host) + + task = None + if s.run_state == self.ITERATING_COMPLETE: + return None + else: + while True: + try: + cur_block = s._blocks[s.cur_block] + except IndexError: + s.run_state = self.ITERATING_COMPLETE + break + + if s.run_state == self.ITERATING_SETUP: + s.run_state = self.ITERATING_TASKS + if self._play._gather_facts == 'smart' and not host.gathered_facts or boolean(self._play._gather_facts): + # mark the host as having gathered facts + host.set_gathered_facts(True) + + task = Task() + task.action = 'setup' + task.set_loader(self._play._loader) + + elif s.run_state == self.ITERATING_TASKS: + # clear the pending setup flag, since we're past that and it didn't fail + if s.pending_setup: + s.pending_setup = False + + if s.fail_state & self.FAILED_TASKS == self.FAILED_TASKS: + s.run_state = self.ITERATING_RESCUE + elif s.cur_regular_task >= len(cur_block.block): + s.run_state = self.ITERATING_ALWAYS + else: + task = cur_block.block[s.cur_regular_task] + s.cur_regular_task += 1 + break + elif s.run_state == self.ITERATING_RESCUE: + if s.fail_state & self.FAILED_RESCUE == self.FAILED_RESCUE: + s.run_state = self.ITERATING_ALWAYS + elif s.cur_rescue_task >= len(cur_block.rescue): + if len(cur_block.rescue) > 0: + s.fail_state = self.FAILED_NONE + s.run_state = self.ITERATING_ALWAYS + else: + task = cur_block.rescue[s.cur_rescue_task] + s.cur_rescue_task += 1 + break + elif s.run_state == self.ITERATING_ALWAYS: + if s.cur_always_task >= len(cur_block.always): + if s.fail_state != self.FAILED_NONE: + s.run_state = self.ITERATING_COMPLETE + break + else: + s.cur_block += 1 + s.cur_regular_task = 0 + s.cur_rescue_task = 0 + s.cur_always_task = 0 + s.run_state = self.ITERATING_TASKS + else: + task= cur_block.always[s.cur_always_task] + s.cur_always_task += 1 + break + + if task and task._role: + # if we had a current role, mark that role as completed + if s.cur_role and task._role != s.cur_role and s.cur_role._had_task_run and not peek: + s.cur_role._completed = True + + s.cur_role = task._role + + if not peek: + self._host_states[host.name] = s + + return (s, task) def mark_host_failed(self, host): - ''' mark the given host as failed ''' - if host.get_name() not in self._host_entries: - raise AnsibleError("invalid host (%s) specified for playbook iteration" % host) + s = self.get_host_state(host) + if s.pending_setup: + s.fail_state |= self.FAILED_SETUP + s.run_state = self.ITERATING_COMPLETE + elif s.run_state == self.ITERATING_TASKS: + s.fail_state |= self.FAILED_TASKS + s.run_state = self.ITERATING_RESCUE + elif s.run_state == self.ITERATING_RESCUE: + s.fail_state |= self.FAILED_RESCUE + s.run_state = self.ITERATING_ALWAYS + elif s.run_state == self.ITERATING_ALWAYS: + s.fail_state |= self.FAILED_ALWAYS + s.run_state = self.ITERATING_COMPLETE + self._host_states[host.name] = s - self._host_entries[host.get_name()].mark_failed() + def get_failed_hosts(self): + return dict((host, True) for (host, state) in self._host_states.iteritems() if state.run_state == self.ITERATING_COMPLETE and state.failed_state != self.FAILED_NONE) - def get_original_task(self, task): + def get_original_task(self, host, task): ''' Finds the task in the task list which matches the UUID of the given task. The executor engine serializes/deserializes objects as they are passed through the different processes, and not all data structures are preserved. This method allows us to find the original task passed into the executor engine. ''' - - for t in self._task_list: - if t._uuid == task._uuid: - return t - + for block in self._blocks: + if block.block: + for t in block.block: + if t._uuid == task._uuid: + return t + if block.rescue: + for t in block.rescue: + if t._uuid == task._uuid: + return t + if block.always: + for t in block.always: + if t._uuid == task._uuid: + return t return None - def add_tasks(self, host, task_list): - if host.name not in self._host_entries: - raise AnsibleError("invalid host (%s) specified for playbook iteration (expanding task list)" % host) + def add_tasks(self, task_list): + if self._run_state == self.ITERATING_TASKS: + before = self._task_list[:self._cur_task_pos + self._tasks_added] + after = self._task_list[self._cur_task_pos + self._tasks_added:] + self._task_list = before + task_list + after + elif self._run_state == self.ITERATING_RESCUE: + before = self._cur_block.rescue[:self._cur_rescue_pos + self._tasks_added] + after = self._cur_block.rescue[self._cur_rescue_pos + self._tasks_added:] + self._cur_block.rescue = before + task_list + after + elif self._run_state == self.ITERATING_ALWAYS: + before = self._cur_block.always[:self._cur_always_pos + self._tasks_added] + after = self._cur_block.always[self._cur_always_pos + self._tasks_added:] + self._cur_block.always = before + task_list + after + + # set this internal flag now so we know if + self._tasks_added += len(task_list) - self._host_entries[host.name].add_tasks(task_list) diff --git a/v2/ansible/executor/process/result.py b/v2/ansible/executor/process/result.py index f794fab58c9..761db21fe69 100644 --- a/v2/ansible/executor/process/result.py +++ b/v2/ansible/executor/process/result.py @@ -137,12 +137,13 @@ class ResultProcess(multiprocessing.Process): result_items = [ result._result ] for result_item in result_items: - if 'include' in result_item: - include_variables = result_item.get('include_variables', dict()) - if 'item' in result_item: - include_variables['item'] = result_item['item'] - self._send_result(('include', result._host, result._task, result_item['include'], include_variables)) - elif 'add_host' in result_item: + #if 'include' in result_item: + # include_variables = result_item.get('include_variables', dict()) + # if 'item' in result_item: + # include_variables['item'] = result_item['item'] + # self._send_result(('include', result._host, result._task, result_item['include'], include_variables)) + #elif 'add_host' in result_item: + if 'add_host' in result_item: # this task added a new host (add_host module) self._send_result(('add_host', result_item)) elif 'add_group' in result_item: diff --git a/v2/ansible/executor/task_executor.py b/v2/ansible/executor/task_executor.py index b519b3c3263..99beeeae66f 100644 --- a/v2/ansible/executor/task_executor.py +++ b/v2/ansible/executor/task_executor.py @@ -132,13 +132,14 @@ class TaskExecutor: res = self._execute(variables=task_vars) (self._task, tmp_task) = (tmp_task, self._task) - # FIXME: we should be sending back a callback result for each item in the loop here - # now update the result with the item info, and append the result # to the list of results res['item'] = item results.append(res) + # FIXME: we should be sending back a callback result for each item in the loop here + print(res) + return results def _squash_items(self, items, variables): diff --git a/v2/ansible/executor/task_queue_manager.py b/v2/ansible/executor/task_queue_manager.py index 72ff04d53dc..7c77f8e3a70 100644 --- a/v2/ansible/executor/task_queue_manager.py +++ b/v2/ansible/executor/task_queue_manager.py @@ -26,7 +26,6 @@ import sys from ansible.errors import AnsibleError from ansible.executor.connection_info import ConnectionInformation -#from ansible.executor.manager import AnsibleManager from ansible.executor.play_iterator import PlayIterator from ansible.executor.process.worker import WorkerProcess from ansible.executor.process.result import ResultProcess @@ -36,7 +35,6 @@ from ansible.utils.debug import debug __all__ = ['TaskQueueManager'] - class TaskQueueManager: ''' @@ -59,10 +57,6 @@ class TaskQueueManager: # a special flag to help us exit cleanly self._terminated = False - # create and start the multiprocessing manager - #self._manager = AnsibleManager() - #self._manager.start() - # this dictionary is used to keep track of notified handlers self._notified_handlers = dict() diff --git a/v2/ansible/playbook/base.py b/v2/ansible/playbook/base.py index 7da51c24b94..691de0c9f0f 100644 --- a/v2/ansible/playbook/base.py +++ b/v2/ansible/playbook/base.py @@ -273,7 +273,7 @@ class Base: if needle in self._attributes: return self._attributes[needle] - raise AttributeError("attribute not found: %s" % needle) + raise AttributeError("attribute not found in %s: %s" % (self.__class__.__name__, needle)) def __getstate__(self): return self.serialize() diff --git a/v2/ansible/playbook/block.py b/v2/ansible/playbook/block.py index c701e90b7f3..533b552f22e 100644 --- a/v2/ansible/playbook/block.py +++ b/v2/ansible/playbook/block.py @@ -28,9 +28,9 @@ from ansible.playbook.taggable import Taggable class Block(Base, Conditional, Taggable): - _block = FieldAttribute(isa='list') - _rescue = FieldAttribute(isa='list') - _always = FieldAttribute(isa='list') + _block = FieldAttribute(isa='list', default=[]) + _rescue = FieldAttribute(isa='list', default=[]) + _always = FieldAttribute(isa='list', default=[]) # for future consideration? this would be functionally # similar to the 'else' clause for exceptions @@ -41,6 +41,7 @@ class Block(Base, Conditional, Taggable): self._role = role self._task_include = task_include self._use_handlers = use_handlers + self._dep_chain = [] super(Block, self).__init__() @@ -141,6 +142,7 @@ class Block(Base, Conditional, Taggable): def copy(self): new_me = super(Block, self).copy() new_me._use_handlers = self._use_handlers + new_me._dep_chain = self._dep_chain[:] new_me._parent_block = None if self._parent_block: @@ -163,6 +165,7 @@ class Block(Base, Conditional, Taggable): ''' data = dict(when=self.when) + data['dep_chain'] = self._dep_chain if self._role is not None: data['role'] = self._role.serialize() @@ -177,11 +180,11 @@ class Block(Base, Conditional, Taggable): serialize method ''' - #from ansible.playbook.task_include import TaskInclude from ansible.playbook.task import Task # unpack the when attribute, which is the only one we want self.when = data.get('when') + self._dep_chain = data.get('dep_chain', []) # if there was a serialized role, unpack it too role_data = data.get('role') @@ -198,6 +201,10 @@ class Block(Base, Conditional, Taggable): self._task_include = ti def evaluate_conditional(self, all_vars): + if len(self._dep_chain): + for dep in self._dep_chain: + if not dep.evaluate_conditional(all_vars): + return False if self._task_include is not None: if not self._task_include.evaluate_conditional(all_vars): return False @@ -211,6 +218,9 @@ class Block(Base, Conditional, Taggable): def evaluate_tags(self, only_tags, skip_tags, all_vars): result = False + if len(self._dep_chain): + for dep in self._dep_chain: + result |= dep.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars) if self._parent_block is not None: result |= self._parent_block.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars) elif self._role is not None: @@ -227,3 +237,6 @@ class Block(Base, Conditional, Taggable): if self._task_include: self._task_include.set_loader(loader) + for dep in self._dep_chain: + dep.set_loader(loader) + diff --git a/v2/ansible/playbook/play.py b/v2/ansible/playbook/play.py index 3de550b23c7..29c9c04cc8e 100644 --- a/v2/ansible/playbook/play.py +++ b/v2/ansible/playbook/play.py @@ -198,13 +198,13 @@ class Play(Base, Taggable): the parent role R last. This is done for all roles in the Play. ''' - task_list = [] + block_list = [] if len(self.roles) > 0: for r in self.roles: - task_list.extend(r.compile()) + block_list.extend(r.compile()) - return task_list + return block_list def compile(self): ''' @@ -213,14 +213,14 @@ class Play(Base, Taggable): tasks specified in the play. ''' - task_list = [] + block_list = [] - task_list.extend(compile_block_list(self.pre_tasks)) - task_list.extend(self._compile_roles()) - task_list.extend(compile_block_list(self.tasks)) - task_list.extend(compile_block_list(self.post_tasks)) + block_list.extend(self.pre_tasks) + block_list.extend(self._compile_roles()) + block_list.extend(self.tasks) + block_list.extend(self.post_tasks) - return task_list + return block_list def get_vars(self): return self.vars.copy() diff --git a/v2/ansible/playbook/role/__init__.py b/v2/ansible/playbook/role/__init__.py index 0bdab79946c..dfb1f70addf 100644 --- a/v2/ansible/playbook/role/__init__.py +++ b/v2/ansible/playbook/role/__init__.py @@ -304,25 +304,25 @@ class Role(Base, Conditional, Taggable): can correctly take their parent's tags/conditionals into account. ''' - task_list = [] + block_list = [] # update the dependency chain here new_dep_chain = dep_chain + [self] deps = self.get_direct_dependencies() for dep in deps: - dep_tasks = dep.compile(dep_chain=new_dep_chain) - for dep_task in dep_tasks: + dep_blocks = dep.compile(dep_chain=new_dep_chain) + for dep_block in dep_blocks: # since we're modifying the task, and need it to be unique, # we make a copy of it here and assign the dependency chain # to the copy, then append the copy to the task list. - new_dep_task = dep_task.copy() - new_dep_task._dep_chain = new_dep_chain - task_list.append(new_dep_task) + new_dep_block = dep_block.copy() + new_dep_block._dep_chain = new_dep_chain + block_list.append(new_dep_block) - task_list.extend(compile_block_list(self._task_blocks)) + block_list.extend(self._task_blocks) - return task_list + return block_list def serialize(self, include_deps=True): res = super(Role, self).serialize() diff --git a/v2/ansible/playbook/task.py b/v2/ansible/playbook/task.py index a9b6a205897..e6fcc13d259 100644 --- a/v2/ansible/playbook/task.py +++ b/v2/ansible/playbook/task.py @@ -102,7 +102,6 @@ class Task(Base, Conditional, Taggable): self._block = block self._role = role self._task_include = task_include - self._dep_chain = [] super(Task, self).__init__() @@ -226,7 +225,6 @@ class Task(Base, Conditional, Taggable): def copy(self): new_me = super(Task, self).copy() - new_me._dep_chain = self._dep_chain[:] new_me._block = None if self._block: @@ -244,7 +242,6 @@ class Task(Base, Conditional, Taggable): def serialize(self): data = super(Task, self).serialize() - data['dep_chain'] = self._dep_chain if self._block: data['block'] = self._block.serialize() @@ -263,7 +260,6 @@ class Task(Base, Conditional, Taggable): #from ansible.playbook.task_include import TaskInclude block_data = data.get('block') - self._dep_chain = data.get('dep_chain', []) if block_data: b = Block() @@ -289,10 +285,6 @@ class Task(Base, Conditional, Taggable): super(Task, self).deserialize(data) def evaluate_conditional(self, all_vars): - if len(self._dep_chain): - for dep in self._dep_chain: - if not dep.evaluate_conditional(all_vars): - return False if self._block is not None: if not self._block.evaluate_conditional(all_vars): return False @@ -303,9 +295,6 @@ class Task(Base, Conditional, Taggable): def evaluate_tags(self, only_tags, skip_tags, all_vars): result = False - if len(self._dep_chain): - for dep in self._dep_chain: - result |= dep.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars) if self._block is not None: result |= self._block.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars) return result | super(Task, self).evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars) @@ -324,5 +313,3 @@ class Task(Base, Conditional, Taggable): if self._task_include: self._task_include.set_loader(loader) - for dep in self._dep_chain: - dep.set_loader(loader) diff --git a/v2/ansible/plugins/strategies/__init__.py b/v2/ansible/plugins/strategies/__init__.py index a766f150176..a6f382289b6 100644 --- a/v2/ansible/plugins/strategies/__init__.py +++ b/v2/ansible/plugins/strategies/__init__.py @@ -68,8 +68,8 @@ class StrategyBase: num_failed = len(self._tqm._failed_hosts) num_unreachable = len(self._tqm._unreachable_hosts) - debug("running the cleanup portion of the play") - result &= self.cleanup(iterator, connection_info) + #debug("running the cleanup portion of the play") + #result &= self.cleanup(iterator, connection_info) debug("running handlers") result &= self.run_handlers(iterator, connection_info) @@ -131,6 +131,7 @@ class StrategyBase: if result[0] == 'host_task_failed': if not task.ignore_errors: debug("marking %s as failed" % host.get_name()) + iterator.mark_host_failed(host) self._tqm._failed_hosts[host.get_name()] = True self._callback.runner_on_failed(task, task_result) elif result[0] == 'host_unreachable': @@ -151,26 +152,25 @@ class StrategyBase: # lookup the role in the ROLE_CACHE to make sure we're dealing # with the correct object and mark it as executed for (entry, role_obj) in ROLE_CACHE[task_result._task._role._role_name].iteritems(): - #hashed_entry = frozenset(task_result._task._role._role_params.iteritems()) hashed_entry = hash_params(task_result._task._role._role_params) if entry == hashed_entry : role_obj._had_task_run = True - elif result[0] == 'include': - host = result[1] - task = result[2] - include_file = result[3] - include_vars = result[4] - - if isinstance(task, Handler): - # FIXME: figure out how to make includes work for handlers - pass - else: - original_task = iterator.get_original_task(task) - if original_task._role: - include_file = self._loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_file) - new_tasks = self._load_included_file(original_task, include_file, include_vars) - iterator.add_tasks(host, new_tasks) + #elif result[0] == 'include': + # host = result[1] + # task = result[2] + # include_file = result[3] + # include_vars = result[4] + # + # if isinstance(task, Handler): + # # FIXME: figure out how to make includes work for handlers + # pass + # else: + # original_task = iterator.get_original_task(host, task) + # if original_task and original_task._role: + # include_file = self._loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_file) + # new_tasks = self._load_included_file(original_task, include_file, include_vars) + # iterator.add_tasks(host, new_tasks) elif result[0] == 'add_host': task_result = result[1] @@ -314,6 +314,8 @@ class StrategyBase: task_list = compile_block_list(block_list) + + # set the vars for this task from those specified as params to the include for t in task_list: t.vars = include_vars.copy() @@ -355,18 +357,21 @@ class StrategyBase: iterator.mark_host_failed(host) del self._tqm._failed_hosts[host_name] - if host_name not in self._tqm._unreachable_hosts and iterator.get_next_task_for_host(host, peek=True): + if host_name in self._blocked_hosts: work_to_do = True - # check to see if this host is blocked (still executing a previous task) - if not host_name in self._blocked_hosts: - # pop the task, mark the host blocked, and queue it - self._blocked_hosts[host_name] = True - task = iterator.get_next_task_for_host(host) - task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) - self._callback.playbook_on_cleanup_task_start(task.get_name()) - self._queue_task(host, task, task_vars, connection_info) + continue + elif iterator.get_next_task_for_host(host, peek=True) and host_name not in self._tqm._unreachable_hosts: + work_to_do = True + + # pop the task, mark the host blocked, and queue it + self._blocked_hosts[host_name] = True + task = iterator.get_next_task_for_host(host) + task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) + self._callback.playbook_on_cleanup_task_start(task.get_name()) + self._queue_task(host, task, task_vars, connection_info) self._process_pending_results(iterator) + time.sleep(0.01) # no more work, wait until the queue is drained self._wait_on_pending_results(iterator) diff --git a/v2/ansible/plugins/strategies/linear.py b/v2/ansible/plugins/strategies/linear.py index afb7c608538..1725145d74f 100644 --- a/v2/ansible/plugins/strategies/linear.py +++ b/v2/ansible/plugins/strategies/linear.py @@ -20,11 +20,96 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type from ansible.errors import AnsibleError +from ansible.executor.play_iterator import PlayIterator +from ansible.playbook.task import Task from ansible.plugins.strategies import StrategyBase from ansible.utils.debug import debug class StrategyModule(StrategyBase): + def _get_next_task_lockstep(self, hosts, iterator): + ''' + Returns a list of (host, task) tuples, where the task may + be a noop task to keep the iterator in lock step across + all hosts. + ''' + + noop_task = Task() + noop_task.action = 'meta' + noop_task.args['_raw_params'] = 'noop' + noop_task.set_loader(iterator._play._loader) + + host_tasks = {} + for host in hosts: + host_tasks[host.name] = iterator.get_next_task_for_host(host, peek=True) + + num_setups = 0 + num_tasks = 0 + num_rescue = 0 + num_always = 0 + + lowest_cur_block = len(iterator._blocks) + + for (k, v) in host_tasks.iteritems(): + (s, t) = v + if s.cur_block < lowest_cur_block and s.run_state != PlayIterator.ITERATING_COMPLETE: + lowest_cur_block = s.cur_block + + if s.run_state == PlayIterator.ITERATING_SETUP: + num_setups += 1 + elif s.run_state == PlayIterator.ITERATING_TASKS: + num_tasks += 1 + elif s.run_state == PlayIterator.ITERATING_RESCUE: + num_rescue += 1 + elif s.run_state == PlayIterator.ITERATING_ALWAYS: + num_always += 1 + + def _advance_selected_hosts(hosts, cur_block, cur_state): + ''' + This helper returns the task for all hosts in the requested + state, otherwise they get a noop dummy task. This also advances + the state of the host, since the given states are determined + while using peek=True. + ''' + # we return the values in the order they were originally + # specified in the given hosts array + rvals = [] + for host in hosts: + (s, t) = host_tasks[host.name] + if s.run_state == cur_state and s.cur_block == cur_block: + new_t = iterator.get_next_task_for_host(host) + #if new_t != t: + # raise AnsibleError("iterator error, wtf?") + rvals.append((host, t)) + else: + rvals.append((host, noop_task)) + return rvals + + # if any hosts are in ITERATING_SETUP, return the setup task + # while all other hosts get a noop + if num_setups: + return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_SETUP) + + # if any hosts are in ITERATING_TASKS, return the next normal + # task for these hosts, while all other hosts get a noop + if num_tasks: + return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_TASKS) + + # if any hosts are in ITERATING_RESCUE, return the next rescue + # task for these hosts, while all other hosts get a noop + if num_rescue: + return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_RESCUE) + + # if any hosts are in ITERATING_ALWAYS, return the next always + # task for these hosts, while all other hosts get a noop + if num_always: + return _advance_selected_hosts(hosts, lowest_cur_block, PlayIterator.ITERATING_ALWAYS) + + # at this point, everything must be ITERATING_COMPLETE, so we + # return None for all hosts in the list + return [(host, None) for host in hosts] + + def run(self, iterator, connection_info): ''' The linear strategy is simple - get the next task and queue @@ -40,6 +125,7 @@ class StrategyModule(StrategyBase): try: debug("getting the remaining hosts for this loop") + self._tqm._failed_hosts = iterator.get_failed_hosts() hosts_left = self.get_hosts_remaining(iterator._play) debug("done getting the remaining hosts for this loop") if len(hosts_left) == 0: @@ -51,40 +137,39 @@ class StrategyModule(StrategyBase): # queue up this task for each host in the inventory callback_sent = False work_to_do = False - for host in hosts_left: - while True: - task = iterator.get_next_task_for_host(host) - if not task: - break - - debug("getting variables") - task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) - debug("done getting variables") - - # check to see if this task should be skipped, due to it being a member of a - # role which has already run (and whether that role allows duplicate execution) - if task._role and task._role.has_run(): - # If there is no metadata, the default behavior is to not allow duplicates, - # if there is metadata, check to see if the allow_duplicates flag was set to true - if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates: - debug("'%s' skipped because role has already run" % task) - continue - - if not task.evaluate_tags(connection_info.only_tags, connection_info.skip_tags, task_vars) and task.action != 'setup': - debug("'%s' failed tag evaluation" % task) - continue - - break + host_tasks = self._get_next_task_lockstep(hosts_left, iterator) + for (host, task) in host_tasks: if not task: continue work_to_do = True + + debug("getting variables") + task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) + debug("done getting variables") + + # check to see if this task should be skipped, due to it being a member of a + # role which has already run (and whether that role allows duplicate execution) + if task._role and task._role.has_run(): + # If there is no metadata, the default behavior is to not allow duplicates, + # if there is metadata, check to see if the allow_duplicates flag was set to true + if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates: + debug("'%s' skipped because role has already run" % task) + continue + + if not task.evaluate_tags(connection_info.only_tags, connection_info.skip_tags, task_vars) and task.action != 'setup': + debug("'%s' failed tag evaluation" % task) + continue + if task.action == 'meta': # meta tasks store their args in the _raw_params field of args, # since they do not use k=v pairs, so get that meta_action = task.args.get('_raw_params') - if meta_action == 'flush_handlers': + if meta_action == 'noop': + # FIXME: issue a callback for the noop here? + continue + elif meta_action == 'flush_handlers': self.run_handlers(iterator, connection_info) else: raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds) @@ -100,6 +185,11 @@ class StrategyModule(StrategyBase): debug("done queuing things up, now waiting for results queue to drain") self._wait_on_pending_results(iterator) + + # FIXME: MAKE PENDING RESULTS RETURN RESULTS PROCESSED AND USE THEM + # TO TAKE ACTION, ie. FOR INCLUDE STATEMENTS TO PRESERVE THE + # LOCK STEP OPERATION + debug("results queue empty") except (IOError, EOFError), e: debug("got IOError/EOFError in task loop: %s" % e) diff --git a/v2/samples/include.yml b/v2/samples/include.yml new file mode 100644 index 00000000000..2ffdc3dd765 --- /dev/null +++ b/v2/samples/include.yml @@ -0,0 +1,4 @@ +- debug: msg="this is the include, a=={{a}}" +- debug: msg="this is the second debug in the include" +- debug: msg="this is the third debug in the include, and a is still {{a}}" + diff --git a/v2/samples/localhosts b/v2/samples/localhosts new file mode 100644 index 00000000000..ff893524f45 --- /dev/null +++ b/v2/samples/localhosts @@ -0,0 +1,3 @@ +l1 ansible_ssh_host=127.0.0.1 +l2 ansible_ssh_host=127.0.0.2 +l3 ansible_ssh_host=127.0.0.3 diff --git a/v2/samples/roles/common/meta/main.yml b/v2/samples/roles/common/meta/main.yml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/v2/samples/roles/common/tasks/main.yml b/v2/samples/roles/common/tasks/main.yml new file mode 100644 index 00000000000..abe6960707a --- /dev/null +++ b/v2/samples/roles/common/tasks/main.yml @@ -0,0 +1 @@ +- debug: msg="this is a task from the common role" diff --git a/v2/samples/roles/role_a/meta/main.yml b/v2/samples/roles/role_a/meta/main.yml new file mode 100644 index 00000000000..4a41c133d76 --- /dev/null +++ b/v2/samples/roles/role_a/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: +- common diff --git a/v2/samples/roles/role_a/tasks/main.yml b/v2/samples/roles/role_a/tasks/main.yml new file mode 100644 index 00000000000..b5b28331a16 --- /dev/null +++ b/v2/samples/roles/role_a/tasks/main.yml @@ -0,0 +1 @@ +- debug: msg="this is a task from role A" diff --git a/v2/samples/roles/role_b/meta/main.yml b/v2/samples/roles/role_b/meta/main.yml new file mode 100644 index 00000000000..4a41c133d76 --- /dev/null +++ b/v2/samples/roles/role_b/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: +- common diff --git a/v2/samples/roles/role_b/tasks/main.yml b/v2/samples/roles/role_b/tasks/main.yml new file mode 100644 index 00000000000..28a158227c2 --- /dev/null +++ b/v2/samples/roles/role_b/tasks/main.yml @@ -0,0 +1 @@ +- debug: msg="this is a task from role B" diff --git a/v2/samples/roles/test_role/meta/main.yml b/v2/samples/roles/test_role/meta/main.yml new file mode 100644 index 00000000000..bc0f245cc8a --- /dev/null +++ b/v2/samples/roles/test_role/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: + - test_role_dep diff --git a/v2/samples/roles/test_role/tasks/main.yml b/v2/samples/roles/test_role/tasks/main.yml index ea0160bc16a..c0f2a9f041e 100644 --- a/v2/samples/roles/test_role/tasks/main.yml +++ b/v2/samples/roles/test_role/tasks/main.yml @@ -1 +1,3 @@ - debug: msg="here we are in the role, foo={{foo}}" +- fail: + when: foo != "bar" diff --git a/v2/samples/roles/test_role_dep/tasks/main.yml b/v2/samples/roles/test_role_dep/tasks/main.yml new file mode 100644 index 00000000000..b319918f38e --- /dev/null +++ b/v2/samples/roles/test_role_dep/tasks/main.yml @@ -0,0 +1 @@ +- debug: msg="here we are in the role dependency" diff --git a/v2/samples/test_block.yml b/v2/samples/test_block.yml index ae1f2224448..25c90030823 100644 --- a/v2/samples/test_block.yml +++ b/v2/samples/test_block.yml @@ -1,17 +1,20 @@ -- hosts: localhost +- hosts: all connection: local - gather_facts: no + gather_facts: yes tasks: - block: - - command: /bin/false - - debug: msg="you shouldn't see me" + - debug: msg="this is the first task" + - fail: + when: inventory_hostname == "l2" + - debug: msg="only l1 and l3 should see me" rescue: - debug: msg="this is the rescue" - command: /bin/false - - debug: msg="you shouldn't see this either" + - debug: msg="no host should see this run" always: - debug: msg="this is the always block, it will always be seen" when: foo|default('') != "some value" tags: - foo - bar + - debug: msg="you should only see l1 and l3 run this" diff --git a/v2/samples/test_blocks_of_blocks.yml b/v2/samples/test_blocks_of_blocks.yml new file mode 100644 index 00000000000..8092a9ad8b3 --- /dev/null +++ b/v2/samples/test_blocks_of_blocks.yml @@ -0,0 +1,8 @@ +- hosts: localhost + gather_facts: no + tasks: + - block: + - block: + - block: + - block: + - debug: msg="are we there yet?" diff --git a/v2/samples/test_include.yml b/v2/samples/test_include.yml new file mode 100644 index 00000000000..6a47f275f43 --- /dev/null +++ b/v2/samples/test_include.yml @@ -0,0 +1,26 @@ +- hosts: localhost + gather_facts: no + tasks: + - block: + - include: include.yml + when: 1 == 2 + - include: include.yml a=1 + when: 1 == 1 + notify: foo + - include: include.yml a={{item}} + with_items: + - foo + - bar + - bam + - fail: + + #rescue: + #- include: include.yml a=rescue + + always: + - include: include.yml a=always + + handlers: + - name: foo + include: include.yml a="this is a handler" + diff --git a/v2/samples/test_role.yml b/v2/samples/test_role.yml index 114fd5a489c..a5ca620c54c 100644 --- a/v2/samples/test_role.yml +++ b/v2/samples/test_role.yml @@ -1,4 +1,5 @@ -- hosts: ubuntu1404 +- hosts: localhost + connection: local gather_facts: no vars: foo: "BAD!!" diff --git a/v2/samples/test_roles_complex.yml b/v2/samples/test_roles_complex.yml new file mode 100644 index 00000000000..8359dd2bc7f --- /dev/null +++ b/v2/samples/test_roles_complex.yml @@ -0,0 +1,6 @@ +- hosts: localhost + gather_facts: no + roles: + - { role: role_a, tags: A, when: skip != "A" } + - { role: role_b, tags: B, when: skip != "B" } +