code cleanup and reoorg, renamed vars and functions to actual purpose

reneabled logging of steps
This commit is contained in:
Brian Coca 2015-10-31 21:35:48 -04:00
parent 89957eed53
commit d192e2c3e3

View file

@ -27,15 +27,20 @@ import shlex
import os import os
import subprocess import subprocess
import sys import sys
import datetime
import traceback import traceback
import signal import signal
import time import time
import syslog import syslog
syslog.openlog('ansible-%s' % os.path.basename(__file__))
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:]))
def notice(msg):
syslog.syslog(syslog.LOG_NOTICE, msg)
def daemonize_self(): def daemonize_self():
# daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
# logger.info("cobblerd started")
try: try:
pid = os.fork() pid = os.fork()
if pid > 0: if pid > 0:
@ -65,50 +70,21 @@ def daemonize_self():
os.dup2(dev_null.fileno(), sys.stdout.fileno()) os.dup2(dev_null.fileno(), sys.stdout.fileno())
os.dup2(dev_null.fileno(), sys.stderr.fileno()) os.dup2(dev_null.fileno(), sys.stderr.fileno())
if len(sys.argv) < 3:
print json.dumps({
"failed" : True,
"msg" : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile>. Humans, do not call directly!"
})
sys.exit(1)
jid = "%s.%d" % (sys.argv[1], os.getpid()) def _run_module(wrapped_cmd, jid, job_path):
time_limit = sys.argv[2]
wrapped_module = sys.argv[3]
argsfile = sys.argv[4]
cmd = "%s %s" % (wrapped_module, argsfile)
syslog.openlog('ansible-%s' % os.path.basename(__file__)) jobfile = open(job_path, "w")
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:])) jobfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid }))
jobfile.close()
# setup logging directory jobfile = open(job_path, "w")
logdir = os.path.expanduser("~/.ansible_async")
log_path = os.path.join(logdir, jid)
if not os.path.exists(logdir):
try:
os.makedirs(logdir)
except:
print json.dumps({
"failed" : 1,
"msg" : "could not create: %s" % logdir
})
def _run_command(wrapped_cmd, jid, log_path):
logfile = open(log_path, "w")
logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid }))
logfile.close()
logfile = open(log_path, "w")
result = {} result = {}
outdata = '' outdata = ''
try: try:
cmd = shlex.split(wrapped_cmd) cmd = shlex.split(wrapped_cmd)
script = subprocess.Popen(cmd, shell=False, script = subprocess.Popen(cmd, shell=False, stdin=None, stdout=jobfile, stderr=jobfile)
stdin=None, stdout=logfile, stderr=logfile)
script.communicate() script.communicate()
outdata = file(log_path).read() outdata = file(job_path).read()
result = json.loads(outdata) result = json.loads(outdata)
except (OSError, IOError), e: except (OSError, IOError), e:
@ -118,83 +94,109 @@ def _run_command(wrapped_cmd, jid, log_path):
"msg": str(e), "msg": str(e),
} }
result['ansible_job_id'] = jid result['ansible_job_id'] = jid
logfile.write(json.dumps(result)) jobfile.write(json.dumps(result))
except: except:
result = { result = {
"failed" : 1, "failed" : 1,
"cmd" : wrapped_cmd, "cmd" : wrapped_cmd,
"data" : outdata, # temporary debug only "data" : outdata, # temporary notice only
"msg" : traceback.format_exc() "msg" : traceback.format_exc()
} }
result['ansible_job_id'] = jid result['ansible_job_id'] = jid
logfile.write(json.dumps(result)) jobfile.write(json.dumps(result))
logfile.close() jobfile.close()
# immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process
#import logging ####################
#import logging.handlers ## main ##
####################
if __name__ == '__main__':
#logger = logging.getLogger("ansible_async") if len(sys.argv) < 3:
#logger.setLevel(logging.WARNING) print json.dumps({
#logger.addHandler( logging.handlers.SysLogHandler("/dev/log") ) "failed" : True,
def debug(msg): "msg" : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile>. Humans, do not call directly!"
#logger.warning(msg) })
pass sys.exit(1)
try: jid = "%s.%d" % (sys.argv[1], os.getpid())
pid = os.fork() time_limit = sys.argv[2]
if pid: wrapped_module = sys.argv[3]
# Notify the overlord that the async process started argsfile = sys.argv[4]
cmd = "%s %s" % (wrapped_module, argsfile)
step = 5
# we need to not return immmediately such that the launched command has an attempt # setup job output directory
# to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile) jobdir = os.path.expanduser("~/.ansible_async")
# this probably could be done with some IPC later. Modules should always read job_path = os.path.join(jobdir, jid)
# the argsfile at the very first start of their execution anyway
time.sleep(1)
debug("Return async_wrapper task started.")
print json.dumps({ "started" : 1, "ansible_job_id" : jid, "results_file" : log_path })
sys.stdout.flush()
sys.exit(0)
else:
# The actual wrapper process
# Daemonize, so we keep on running if not os.path.exists(jobdir):
daemonize_self() try:
os.makedirs(jobdir)
except:
print json.dumps({
"failed" : 1,
"msg" : "could not create: %s" % jobdir
})
# immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process
# we are now daemonized, create a supervisory process try:
debug("Starting module and watcher") pid = os.fork()
if pid:
# Notify the overlord that the async process started
sub_pid = os.fork() # we need to not return immmediately such that the launched command has an attempt
if sub_pid: # to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile)
# the parent stops the process after the time limit # this probably could be done with some IPC later. Modules should always read
remaining = int(time_limit) # the argsfile at the very first start of their execution anyway
notice("Return async_wrapper task started.")
# set the child process group id to kill all children print json.dumps({ "started" : 1, "ansible_job_id" : jid, "results_file" : job_path })
os.setpgid(sub_pid, sub_pid) sys.stdout.flush()
time.sleep(1)
debug("Start watching %s (%s)"%(sub_pid, remaining))
time.sleep(5)
while os.waitpid(sub_pid, os.WNOHANG) == (0, 0):
debug("%s still running (%s)"%(sub_pid, remaining))
time.sleep(5)
remaining = remaining - 5
if remaining <= 0:
debug("Now killing %s"%(sub_pid))
os.killpg(sub_pid, signal.SIGKILL)
debug("Sent kill to group %s"%sub_pid)
time.sleep(1)
sys.exit(0)
debug("Done in kid B.")
os._exit(0)
else:
# the child process runs the actual module
debug("Start module (%s)"%os.getpid())
_run_command(cmd, jid, log_path)
debug("Module complete (%s)"%os.getpid())
sys.exit(0) sys.exit(0)
else:
# The actual wrapper process
except Exception, err: # Daemonize, so we keep on running
debug("error: %s"%(err)) daemonize_self()
raise err
# we are now daemonized, create a supervisory process
notice("Starting module and watcher")
sub_pid = os.fork()
if sub_pid:
# the parent stops the process after the time limit
remaining = int(time_limit)
# set the child process group id to kill all children
os.setpgid(sub_pid, sub_pid)
notice("Start watching %s (%s)"%(sub_pid, remaining))
time.sleep(step)
while os.waitpid(sub_pid, os.WNOHANG) == (0, 0):
notice("%s still running (%s)"%(sub_pid, remaining))
time.sleep(step)
remaining = remaining - step
if remaining <= 0:
notice("Now killing %s"%(sub_pid))
os.killpg(sub_pid, signal.SIGKILL)
notice("Sent kill to group %s"%sub_pid)
time.sleep(1)
sys.exit(0)
notice("Done in kid B.")
sys.exit(0)
else:
# the child process runs the actual module
notice("Start module (%s)"%os.getpid())
_run_module(cmd, jid, job_path)
notice("Module complete (%s)"%os.getpid())
sys.exit(0)
except Exception, err:
notice("error: %s"%(err))
print json.dumps({
"failed" : True,
"msg" : "FATAL ERROR: %s" % str(err)
})
sys.exit(1)