From 515fbd5a17b0fc235b46164009fb370727cad234 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Sat, 27 Apr 2013 09:46:48 -0400 Subject: [PATCH] Restructure output to prevent rare cases of interlaced I/O in multiprocessing paths. --- lib/ansible/callbacks.py | 164 +++++++++++++++++++++++---------------- 1 file changed, 99 insertions(+), 65 deletions(-) diff --git a/lib/ansible/callbacks.py b/lib/ansible/callbacks.py index 75299e7284c..04dedb640d8 100644 --- a/lib/ansible/callbacks.py +++ b/lib/ansible/callbacks.py @@ -22,32 +22,55 @@ import os import subprocess import random import fnmatch +import tempfile +import fcntl from ansible.color import stringc -cowsay = None -if os.getenv("ANSIBLE_NOCOWS") is not None: - cowsay = None -elif os.path.exists("/usr/bin/cowsay"): - cowsay = "/usr/bin/cowsay" -elif os.path.exists("/usr/games/cowsay"): - cowsay = "/usr/games/cowsay" -elif os.path.exists("/usr/local/bin/cowsay"): - # BSD path for cowsay - cowsay = "/usr/local/bin/cowsay" -elif os.path.exists("/opt/local/bin/cowsay"): - # MacPorts path for cowsay - cowsay = "/opt/local/bin/cowsay" - -noncow = os.getenv("ANSIBLE_COW_SELECTION",None) -if cowsay and noncow == 'random': - cmd = subprocess.Popen([cowsay, "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (out, err) = cmd.communicate() - cows = out.split() - cows.append(False) - noncow = random.choice(cows) - callback_plugins = [x for x in utils.plugins.callback_loader.all()] +def get_cowsay_info(): + cowsay = None + if os.getenv("ANSIBLE_NOCOWS") is not None: + cowsay = None + elif os.path.exists("/usr/bin/cowsay"): + cowsay = "/usr/bin/cowsay" + elif os.path.exists("/usr/games/cowsay"): + cowsay = "/usr/games/cowsay" + elif os.path.exists("/usr/local/bin/cowsay"): + # BSD path for cowsay + cowsay = "/usr/local/bin/cowsay" + elif os.path.exists("/opt/local/bin/cowsay"): + # MacPorts path for cowsay + cowsay = "/opt/local/bin/cowsay" + + noncow = os.getenv("ANSIBLE_COW_SELECTION",None) + if cowsay and noncow == 'random': + cmd = subprocess.Popen([cowsay, "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (out, err) = cmd.communicate() + cows = out.split() + cows.append(False) + noncow = random.choice(cows) + return (cowsay, noncow) + +cowsay, noncow = get_cowsay_info() + +def log_lockfile(): + tempdir = tempfile.gettempdir() + uid = os.getuid() + path = os.path.join(tempdir, ".ansible-lock.%s" % uid) + if not os.path.exists(path): + fh = open(path, 'w') + fh.close() + return path + +LOG_LOCK = open(log_lockfile(), 'r') + +def log_flock(): + fcntl.flock(LOG_LOCK, fcntl.LOCK_EX) + +def log_unflock(): + fcntl.flock(LOG_LOCK, fcntl.LOCK_UN) + def set_play(callback, play): ''' used to notify callback plugins of context ''' callback.play = play @@ -60,6 +83,17 @@ def set_task(callback, task): for callback_plugin in callback_plugins: callback_plugin.task = task +def display(msg, color=None, stderr=False): + # prevent a very rare case of interlaced multiprocess I/O + log_flock() + msg2 = msg + if color: + msg2 = stringc(msg, color) + if not stderr: + print msg2 + else: + print >>sys.stderr, msg2 + log_unflock() def call_callback_module(method_name, *args, **kwargs): @@ -81,9 +115,9 @@ def vvv(msg, host=None): def verbose(msg, host=None, caplevel=2): if utils.VERBOSITY > caplevel: if host is None: - print stringc(msg, 'blue') + display(msg, color='blue') else: - print stringc("<%s> %s" % (host, msg), 'blue') + display("<%s> %s" % (host, msg), color='blue') class AggregateStats(object): ''' holds stats about per-host activity during playbook runs ''' @@ -195,17 +229,17 @@ def host_report_msg(hostname, module_name, result, oneline): ''' summarize the JSON results for a particular host ''' failed = utils.is_failed(result) - msg = '' + msg = ('', None) if module_name in [ 'command', 'shell', 'raw' ] and 'ansible_job_id' not in result and result.get('parsed',True) != False: if not failed: - msg = command_generic_msg(hostname, result, oneline, 'success') + msg = (command_generic_msg(hostname, result, oneline, 'success'), 'green') else: - msg = command_generic_msg(hostname, result, oneline, 'FAILED') + msg = (command_generic_msg(hostname, result, oneline, 'FAILED'), 'red') else: if not failed: - msg = regular_generic_msg(hostname, result, oneline, 'success') + msg = (regular_generic_msg(hostname, result, oneline, 'success'), 'green') else: - msg = regular_generic_msg(hostname, result, oneline, 'FAILED') + msg = (regular_generic_msg(hostname, result, oneline, 'FAILED'), 'red') return msg ############################################### @@ -267,7 +301,7 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks): def on_unreachable(self, host, res): if type(res) == dict: res = res.get('msg','') - print "%s | FAILED => %s" % (host, res) + display("%s | FAILED => %s" % (host, res)) if self.options.tree: utils.write_tree_file( self.options.tree, host, @@ -276,15 +310,15 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks): super(CliRunnerCallbacks, self).on_unreachable(host, res) def on_skipped(self, host, item=None): - print "%s | skipped" % (host) + display("%s | skipped" % (host)) super(CliRunnerCallbacks, self).on_skipped(host, item) def on_error(self, host, err): - print >>sys.stderr, "err: [%s] => %s\n" % (host, err) + display("err: [%s] => %s\n" % (host, err), stderr=True) super(CliRunnerCallbacks, self).on_error(host, err) def on_no_hosts(self): - print >>sys.stderr, "no hosts matched\n" + display("no hosts matched\n", stderr=True) super(CliRunnerCallbacks, self).on_no_hosts() def on_async_poll(self, host, res, jid, clock): @@ -292,26 +326,27 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks): self._async_notified[jid] = clock + 1 if self._async_notified[jid] > clock: self._async_notified[jid] = clock - print " polling, %ss remaining"%(jid, clock) + display(" polling, %ss remaining" % (jid, clock)) super(CliRunnerCallbacks, self).on_async_poll(host, res, jid, clock) def on_async_ok(self, host, res, jid): - print " finished on %s => %s"%(jid, host, utils.jsonify(res,format=True)) + display(" finished on %s => %s"%(jid, host, utils.jsonify(res,format=True))) super(CliRunnerCallbacks, self).on_async_ok(host, res, jid) def on_async_failed(self, host, res, jid): - print " FAILED on %s => %s"%(jid, host, utils.jsonify(res,format=True)) + display(" FAILED on %s => %s"%(jid, host, utils.jsonify(res,format=True))) super(CliRunnerCallbacks, self).on_async_failed(host,res,jid) def _on_any(self, host, result): result2 = result.copy() result2.pop('invocation', None) - print host_report_msg(host, self.options.module_name, result2, self.options.one_line) + (msg, color) = host_report_msg(host, self.options.module_name, result2, self.options.one_line) + display(msg, color=color) if self.options.tree: utils.write_tree_file(self.options.tree, host, utils.jsonify(result2,format=True)) def on_file_diff(self, host, diff): - print utils.get_diff(diff) + display(utils.get_diff(diff)) super(CliRunnerCallbacks, self).on_file_diff(host, diff) ######################################################################## @@ -332,7 +367,7 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): msg = "fatal: [%s] => (item=%s) => %s" % (host, item, results) else: msg = "fatal: [%s] => %s" % (host, results) - print stringc(msg, 'red') + display(msg, color='red') super(PlaybookRunnerCallbacks, self).on_unreachable(host, results) def on_failed(self, host, results, ignore_errors=False): @@ -353,18 +388,18 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): msg = "failed: [%s] => (item=%s) => %s" % (host, item, utils.jsonify(results2)) else: msg = "failed: [%s] => %s" % (host, utils.jsonify(results2)) - print stringc(msg, 'red') + display(msg, color='red') if stderr: - print stringc("stderr: %s" % stderr, 'red') + display("stderr: %s" % stderr, color='red') if stdout: - print stringc("stdout: %s" % stdout, 'red') + display("stdout: %s" % stdout, color='red') if returned_msg: - print stringc("msg: %s" % returned_msg, 'red') + display("msg: %s" % returned_msg, color='red') if not parsed and module_msg: - print stringc("invalid output was: %s" % module_msg, 'red') + display("invalid output was: %s" % module_msg, color='red') if ignore_errors: - print stringc("...ignoring", 'cyan') + display("...ignoring", color='cyan') super(PlaybookRunnerCallbacks, self).on_failed(host, results, ignore_errors=ignore_errors) def on_ok(self, host, host_result): @@ -395,9 +430,9 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): if msg != '': if not changed: - print stringc(msg, 'green') + display(msg, color='green') else: - print stringc(msg, 'yellow') + display(msg, color='yellow') super(PlaybookRunnerCallbacks, self).on_ok(host, host_result) def on_error(self, host, err): @@ -409,8 +444,7 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): else: msg = "err: [%s] => %s" % (host, err) - msg = stringc(msg, 'red') - print >>sys.stderr, msg + display(msg, color='red', stderr=True) super(PlaybookRunnerCallbacks, self).on_error(host, err) def on_skipped(self, host, item=None): @@ -419,11 +453,11 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): msg = "skipping: [%s] => (item=%s)" % (host, item) else: msg = "skipping: [%s]" % host - print stringc(msg, 'cyan') + display(msg, color='cyan') super(PlaybookRunnerCallbacks, self).on_skipped(host, item) def on_no_hosts(self): - print stringc("FATAL: no hosts matched or all hosts have already failed -- aborting\n", 'red') + display("FATAL: no hosts matched or all hosts have already failed -- aborting\n", color='red') super(PlaybookRunnerCallbacks, self).on_no_hosts() def on_async_poll(self, host, res, jid, clock): @@ -432,21 +466,21 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): if self._async_notified[jid] > clock: self._async_notified[jid] = clock msg = " polling, %ss remaining"%(jid, clock) - print stringc(msg, 'cyan') + display(msg, color='cyan') super(PlaybookRunnerCallbacks, self).on_async_poll(host,res,jid,clock) def on_async_ok(self, host, res, jid): msg = " finished on %s"%(jid, host) - print stringc(msg, 'cyan') + display(msg, color='cyan') super(PlaybookRunnerCallbacks, self).on_async_ok(host, res, jid) def on_async_failed(self, host, res, jid): msg = " FAILED on %s"%(jid, host) - print stringc(msg, 'red') + display(msg, color='red') super(PlaybookRunnerCallbacks, self).on_async_failed(host,res,jid) def on_file_diff(self, host, diff): - print utils.get_diff(diff) + display(utils.get_diff(diff)) super(PlaybookRunnerCallbacks, self).on_file_diff(host, diff) ######################################################################## @@ -465,11 +499,11 @@ class PlaybookCallbacks(object): call_callback_module('playbook_on_notify', host, handler) def on_no_hosts_matched(self): - print stringc("skipping: no hosts matched", 'cyan') + display("skipping: no hosts matched", color='cyan') call_callback_module('playbook_on_no_hosts_matched') def on_no_hosts_remaining(self): - print stringc("\nFATAL: all hosts have already failed -- aborting", 'red') + display("\nFATAL: all hosts have already failed -- aborting", color='red') call_callback_module('playbook_on_no_hosts_remaining') def on_task_start(self, name, is_conditional): @@ -488,15 +522,15 @@ class PlaybookCallbacks(object): resp = raw_input('Perform task: %s (y/n/c): ' % name) if resp.lower() in ['y','yes']: self.skip_task = False - print banner(msg) + display(banner(msg)) elif resp.lower() in ['c', 'continue']: self.skip_task = False self.step = False - print banner(msg) + display(banner(msg)) else: self.skip_task = True else: - print banner(msg) + display(banner(msg)) call_callback_module('playbook_on_task_start', name, is_conditional) @@ -519,7 +553,7 @@ class PlaybookCallbacks(object): second = prompt("confirm " + msg, private) if result == second: break - print "***** VALUES ENTERED DO NOT MATCH ****" + display("***** VALUES ENTERED DO NOT MATCH ****") else: result = prompt(msg, private) @@ -538,21 +572,21 @@ class PlaybookCallbacks(object): return result def on_setup(self): - print banner("GATHERING FACTS") + display(banner("GATHERING FACTS")) call_callback_module('playbook_on_setup') def on_import_for_host(self, host, imported_file): msg = "%s: importing %s" % (host, imported_file) - print stringc(msg, 'cyan') + display(msg, color='cyan') call_callback_module('playbook_on_import_for_host', host, imported_file) def on_not_import_for_host(self, host, missing_file): msg = "%s: not importing file: %s" % (host, missing_file) - print stringc(msg, 'cyan') + display(msg, color='cyan') call_callback_module('playbook_on_not_import_for_host', host, missing_file) def on_play_start(self, pattern): - print banner("PLAY [%s]" % pattern) + display(banner("PLAY [%s]" % pattern)) call_callback_module('playbook_on_play_start', pattern) def on_stats(self, stats):