From db1d5b154a2ea2ebcd659729039b433909297af4 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Fri, 25 May 2012 19:18:02 -0400 Subject: [PATCH] Fix casing/underscore convention in method name, split polling logic away from runner. --- bin/ansible | 2 +- lib/ansible/playbook.py | 2 +- lib/ansible/poller.py | 99 +++++++++++++++++++++++++++++++++++++++++ lib/ansible/runner.py | 8 ++-- 4 files changed, 104 insertions(+), 7 deletions(-) create mode 100644 lib/ansible/poller.py diff --git a/bin/ansible b/bin/ansible index d4f57442420..5304ac3c7f3 100755 --- a/bin/ansible +++ b/bin/ansible @@ -101,7 +101,7 @@ class Cli(object): if options.seconds: print "background launch...\n\n" - results, poller = runner.runAsync(options.seconds) + results, poller = runner.run_async(options.seconds) results = self.poll_while_needed(poller, options) else: results = runner.run() diff --git a/lib/ansible/playbook.py b/lib/ansible/playbook.py index 0f9acc29a34..86868f6b264 100644 --- a/lib/ansible/playbook.py +++ b/lib/ansible/playbook.py @@ -295,7 +295,7 @@ class PlayBook(object): if async_seconds == 0: results = runner.run() else: - results, poller = runner.runAsync(async_seconds) + results, poller = runner.run_async(async_seconds) self.stats.compute(results) if async_poll_interval > 0: # if not polling, playbook requested fire and forget diff --git a/lib/ansible/poller.py b/lib/ansible/poller.py new file mode 100644 index 00000000000..c030d01dad1 --- /dev/null +++ b/lib/ansible/poller.py @@ -0,0 +1,99 @@ +# (c) 2012, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . +# + +import time + +class AsyncPoller(object): + """ Manage asynchronous jobs. """ + + def __init__(self, results, runner): + self.runner = runner + + self.results = { 'contacted': {}, 'dark': {}} + self.hosts_to_poll = [] + self.completed = False + + # Get job id and which hosts to poll again in the future + jid = None + for (host, res) in results['contacted'].iteritems(): + if res.get('started', False): + self.hosts_to_poll.append(host) + jid = res.get('ansible_job_id', None) + else: + self.results['contacted'][host] = res + for (host, res) in results['dark'].iteritems(): + self.results['dark'][host] = res + + if jid is None: + raise errors.AnsibleError("unexpected error: unable to determine jid") + if len(self.hosts_to_poll)==0: + raise errors.AnsibleErrot("unexpected error: no hosts to poll") + self.jid = jid + + def poll(self): + """ Poll the job status. + + Returns the changes in this iteration.""" + self.runner.module_name = 'async_status' + self.runner.module_args = "jid=%s" % self.jid + self.runner.pattern = "*" + self.runner.background = 0 + + self.runner.inventory.restrict_to(self.hosts_to_poll) + results = self.runner.run() + self.runner.inventory.lift_restriction() + + hosts = [] + poll_results = { 'contacted': {}, 'dark': {}, 'polled': {}} + for (host, res) in results['contacted'].iteritems(): + if res.get('started',False): + hosts.append(host) + poll_results['polled'][host] = res + else: + self.results['contacted'][host] = res + poll_results['contacted'][host] = res + if 'failed' in res: + self.runner.callbacks.on_async_failed(host, res, self.jid) + else: + self.runner.callbacks.on_async_ok(host, res, self.jid) + for (host, res) in results['dark'].iteritems(): + self.results['dark'][host] = res + poll_results['dark'][host] = res + self.runner.callbacks.on_async_failed(host, res, self.jid) + + self.hosts_to_poll = hosts + if len(hosts)==0: + self.completed = True + + return poll_results + + def wait(self, seconds, poll_interval): + """ Wait a certain time for job completion, check status every poll_interval. """ + clock = seconds - poll_interval + while (clock >= 0 and not self.completed): + time.sleep(poll_interval) + + poll_results = self.poll() + + for (host, res) in poll_results['polled'].iteritems(): + if res.get('started'): + self.runner.callbacks.on_async_poll(host, res, self.jid, clock) + + clock = clock - poll_interval + + return self.results diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index 5e86c711f0f..0b9005e66d1 100644 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -36,6 +36,7 @@ import ansible.connection import ansible.inventory from ansible import utils from ansible import errors +from ansible import poller from ansible import callbacks as ans_callbacks HAS_ATFORK=True @@ -682,12 +683,9 @@ class Runner(object): else: out=stdout - - # sudo mode paramiko doesn't capture stderr, so not relaying here either... return out - # ***************************************************** def _get_tmp_path(self, conn): @@ -798,12 +796,12 @@ class Runner(object): results = [ self._executor(h[1]) for h in hosts ] return self._partition_results(results) - def runAsync(self, time_limit): + def run_async(self, time_limit): ''' Run this module asynchronously and return a poller. ''' self.background = time_limit results = self.run() - return results, AsyncPoller(results, self) + return results, poller.AsyncPoller(results, self) class AsyncPoller(object): """ Manage asynchronous jobs. """