From d192e2c3e32575713d94a8f7fd19c4d9980a0e90 Mon Sep 17 00:00:00 2001 From: Brian Coca Date: Sat, 31 Oct 2015 21:35:48 -0400 Subject: [PATCH] code cleanup and reoorg, renamed vars and functions to actual purpose reneabled logging of steps --- utilities/logic/async_wrapper.py | 204 ++++++++++++++++--------------- 1 file changed, 103 insertions(+), 101 deletions(-) diff --git a/utilities/logic/async_wrapper.py b/utilities/logic/async_wrapper.py index 2bc2dc21823..55f5283ed79 100644 --- a/utilities/logic/async_wrapper.py +++ b/utilities/logic/async_wrapper.py @@ -27,15 +27,20 @@ import shlex import os import subprocess import sys -import datetime import traceback import signal import time import syslog + +syslog.openlog('ansible-%s' % os.path.basename(__file__)) +syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:])) + +def notice(msg): + syslog.syslog(syslog.LOG_NOTICE, msg) + def daemonize_self(): # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 - # logger.info("cobblerd started") try: pid = os.fork() if pid > 0: @@ -65,50 +70,21 @@ def daemonize_self(): os.dup2(dev_null.fileno(), sys.stdout.fileno()) os.dup2(dev_null.fileno(), sys.stderr.fileno()) -if len(sys.argv) < 3: - print json.dumps({ - "failed" : True, - "msg" : "usage: async_wrapper . Humans, do not call directly!" - }) - sys.exit(1) -jid = "%s.%d" % (sys.argv[1], os.getpid()) -time_limit = sys.argv[2] -wrapped_module = sys.argv[3] -argsfile = sys.argv[4] -cmd = "%s %s" % (wrapped_module, argsfile) +def _run_module(wrapped_cmd, jid, job_path): -syslog.openlog('ansible-%s' % os.path.basename(__file__)) -syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:])) - -# setup logging directory -logdir = os.path.expanduser("~/.ansible_async") -log_path = os.path.join(logdir, jid) - -if not os.path.exists(logdir): - try: - os.makedirs(logdir) - except: - print json.dumps({ - "failed" : 1, - "msg" : "could not create: %s" % logdir - }) - -def _run_command(wrapped_cmd, jid, log_path): - - logfile = open(log_path, "w") - logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid })) - logfile.close() - logfile = open(log_path, "w") + jobfile = open(job_path, "w") + jobfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid })) + jobfile.close() + jobfile = open(job_path, "w") result = {} outdata = '' try: cmd = shlex.split(wrapped_cmd) - script = subprocess.Popen(cmd, shell=False, - stdin=None, stdout=logfile, stderr=logfile) + script = subprocess.Popen(cmd, shell=False, stdin=None, stdout=jobfile, stderr=jobfile) script.communicate() - outdata = file(log_path).read() + outdata = file(job_path).read() result = json.loads(outdata) except (OSError, IOError), e: @@ -118,83 +94,109 @@ def _run_command(wrapped_cmd, jid, log_path): "msg": str(e), } result['ansible_job_id'] = jid - logfile.write(json.dumps(result)) + jobfile.write(json.dumps(result)) except: result = { "failed" : 1, "cmd" : wrapped_cmd, - "data" : outdata, # temporary debug only + "data" : outdata, # temporary notice only "msg" : traceback.format_exc() } result['ansible_job_id'] = jid - logfile.write(json.dumps(result)) - logfile.close() + jobfile.write(json.dumps(result)) + jobfile.close() -# immediately exit this process, leaving an orphaned process -# running which immediately forks a supervisory timing process -#import logging -#import logging.handlers +#################### +## main ## +#################### +if __name__ == '__main__': -#logger = logging.getLogger("ansible_async") -#logger.setLevel(logging.WARNING) -#logger.addHandler( logging.handlers.SysLogHandler("/dev/log") ) -def debug(msg): - #logger.warning(msg) - pass + if len(sys.argv) < 3: + print json.dumps({ + "failed" : True, + "msg" : "usage: async_wrapper . Humans, do not call directly!" + }) + sys.exit(1) -try: - pid = os.fork() - if pid: - # Notify the overlord that the async process started + jid = "%s.%d" % (sys.argv[1], os.getpid()) + time_limit = sys.argv[2] + wrapped_module = sys.argv[3] + argsfile = sys.argv[4] + cmd = "%s %s" % (wrapped_module, argsfile) + step = 5 - # we need to not return immmediately such that the launched command has an attempt - # to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile) - # this probably could be done with some IPC later. Modules should always read - # the argsfile at the very first start of their execution anyway - time.sleep(1) - debug("Return async_wrapper task started.") - print json.dumps({ "started" : 1, "ansible_job_id" : jid, "results_file" : log_path }) - sys.stdout.flush() - sys.exit(0) - else: - # The actual wrapper process + # setup job output directory + jobdir = os.path.expanduser("~/.ansible_async") + job_path = os.path.join(jobdir, jid) - # Daemonize, so we keep on running - daemonize_self() + if not os.path.exists(jobdir): + try: + os.makedirs(jobdir) + except: + print json.dumps({ + "failed" : 1, + "msg" : "could not create: %s" % jobdir + }) + # immediately exit this process, leaving an orphaned process + # running which immediately forks a supervisory timing process - # we are now daemonized, create a supervisory process - debug("Starting module and watcher") + try: + pid = os.fork() + if pid: + # Notify the overlord that the async process started - sub_pid = os.fork() - if sub_pid: - # the parent stops the process after the time limit - remaining = int(time_limit) - - # set the child process group id to kill all children - os.setpgid(sub_pid, sub_pid) - - debug("Start watching %s (%s)"%(sub_pid, remaining)) - time.sleep(5) - while os.waitpid(sub_pid, os.WNOHANG) == (0, 0): - debug("%s still running (%s)"%(sub_pid, remaining)) - time.sleep(5) - remaining = remaining - 5 - if remaining <= 0: - debug("Now killing %s"%(sub_pid)) - os.killpg(sub_pid, signal.SIGKILL) - debug("Sent kill to group %s"%sub_pid) - time.sleep(1) - sys.exit(0) - debug("Done in kid B.") - os._exit(0) - else: - # the child process runs the actual module - debug("Start module (%s)"%os.getpid()) - _run_command(cmd, jid, log_path) - debug("Module complete (%s)"%os.getpid()) + # we need to not return immmediately such that the launched command has an attempt + # to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile) + # this probably could be done with some IPC later. Modules should always read + # the argsfile at the very first start of their execution anyway + notice("Return async_wrapper task started.") + print json.dumps({ "started" : 1, "ansible_job_id" : jid, "results_file" : job_path }) + sys.stdout.flush() + time.sleep(1) sys.exit(0) + else: + # The actual wrapper process -except Exception, err: - debug("error: %s"%(err)) - raise err + # Daemonize, so we keep on running + daemonize_self() + + # we are now daemonized, create a supervisory process + notice("Starting module and watcher") + + sub_pid = os.fork() + if sub_pid: + # the parent stops the process after the time limit + remaining = int(time_limit) + + # set the child process group id to kill all children + os.setpgid(sub_pid, sub_pid) + + notice("Start watching %s (%s)"%(sub_pid, remaining)) + time.sleep(step) + while os.waitpid(sub_pid, os.WNOHANG) == (0, 0): + notice("%s still running (%s)"%(sub_pid, remaining)) + time.sleep(step) + remaining = remaining - step + if remaining <= 0: + notice("Now killing %s"%(sub_pid)) + os.killpg(sub_pid, signal.SIGKILL) + notice("Sent kill to group %s"%sub_pid) + time.sleep(1) + sys.exit(0) + notice("Done in kid B.") + sys.exit(0) + else: + # the child process runs the actual module + notice("Start module (%s)"%os.getpid()) + _run_module(cmd, jid, job_path) + notice("Module complete (%s)"%os.getpid()) + sys.exit(0) + + except Exception, err: + notice("error: %s"%(err)) + print json.dumps({ + "failed" : True, + "msg" : "FATAL ERROR: %s" % str(err) + }) + sys.exit(1)