utilities modules: PEP8 compliancy (#27741)

This commit is contained in:
Dag Wieers 2017-08-10 10:36:28 +02:00 committed by John R Barker
parent f30589c91d
commit eee09565b1
4 changed files with 75 additions and 65 deletions

View file

@ -105,25 +105,31 @@ from ansible.module_utils.six.moves import socketserver
# leaving room for base64 (+33%) encoding and header (100 bytes) # leaving room for base64 (+33%) encoding and header (100 bytes)
# 4 * (975/3) + 100 = 1400 # 4 * (975/3) + 100 = 1400
# which leaves room for the TCP/IP header # which leaves room for the TCP/IP header
CHUNK_SIZE=10240 CHUNK_SIZE = 10240
# FIXME: this all should be moved to module_common, as it's # FIXME: this all should be moved to module_common, as it's
# pretty much a copy from the callbacks/util code # pretty much a copy from the callbacks/util code
DEBUG_LEVEL=0 DEBUG_LEVEL = 0
def log(msg, cap=0): def log(msg, cap=0):
global DEBUG_LEVEL global DEBUG_LEVEL
if DEBUG_LEVEL >= cap: if DEBUG_LEVEL >= cap:
syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg) syslog.syslog(syslog.LOG_NOTICE | syslog.LOG_DAEMON, msg)
def v(msg): def v(msg):
log(msg, cap=1) log(msg, cap=1)
def vv(msg): def vv(msg):
log(msg, cap=2) log(msg, cap=2)
def vvv(msg): def vvv(msg):
log(msg, cap=3) log(msg, cap=3)
def vvvv(msg): def vvvv(msg):
log(msg, cap=4) log(msg, cap=4)
@ -137,6 +143,7 @@ except ImportError:
SOCKET_FILE = os.path.join(get_module_path(), '.ansible-accelerate', ".local.socket") SOCKET_FILE = os.path.join(get_module_path(), '.ansible-accelerate', ".local.socket")
def get_pid_location(module): def get_pid_location(module):
""" """
Try to find a pid directory in the common locations, falling Try to find a pid directory in the common locations, falling
@ -144,7 +151,7 @@ def get_pid_location(module):
""" """
for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]: for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]:
try: try:
if os.path.isdir(dir) and os.access(dir, os.R_OK|os.W_OK): if os.path.isdir(dir) and os.access(dir, os.R_OK | os.W_OK):
return os.path.join(dir, '.accelerate.pid') return os.path.join(dir, '.accelerate.pid')
except: except:
pass pass
@ -185,12 +192,13 @@ def daemonize_self(module, password, port, minutes, pid_file):
log('fork #2 failed: %d (%s)' % (e.errno, e.strerror)) log('fork #2 failed: %d (%s)' % (e.errno, e.strerror))
sys.exit(1) sys.exit(1)
dev_null = open('/dev/null','rw') dev_null = open('/dev/null', 'rw')
os.dup2(dev_null.fileno(), sys.stdin.fileno()) os.dup2(dev_null.fileno(), sys.stdin.fileno())
os.dup2(dev_null.fileno(), sys.stdout.fileno()) os.dup2(dev_null.fileno(), sys.stdout.fileno())
os.dup2(dev_null.fileno(), sys.stderr.fileno()) os.dup2(dev_null.fileno(), sys.stderr.fileno())
log("daemonizing successful") log("daemonizing successful")
class LocalSocketThread(Thread): class LocalSocketThread(Thread):
server = None server = None
terminated = False terminated = False
@ -271,6 +279,7 @@ class LocalSocketThread(Thread):
self.s.shutdown(socket.SHUT_RDWR) self.s.shutdown(socket.SHUT_RDWR)
self.s.close() self.s.close()
class ThreadWithReturnValue(Thread): class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose) Thread.__init__(self, group, target, name, args, kwargs, Verbose)
@ -281,14 +290,16 @@ class ThreadWithReturnValue(Thread):
self._return = self._Thread__target(*self._Thread__args, self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs) **self._Thread__kwargs)
def join(self,timeout=None): def join(self, timeout=None):
Thread.join(self, timeout=timeout) Thread.join(self, timeout=timeout)
return self._return return self._return
class ThreadedTCPServer(socketserver.ThreadingTCPServer): class ThreadedTCPServer(socketserver.ThreadingTCPServer):
key_list = [] key_list = []
last_event = datetime.datetime.now() last_event = datetime.datetime.now()
last_event_lock = Lock() last_event_lock = Lock()
def __init__(self, server_address, RequestHandlerClass, module, password, timeout, use_ipv6=False): def __init__(self, server_address, RequestHandlerClass, module, password, timeout, use_ipv6=False):
self.module = module self.module = module
self.key_list.append(AesKey.Read(password)) self.key_list.append(AesKey.Read(password))
@ -309,6 +320,7 @@ class ThreadedTCPServer(socketserver.ThreadingTCPServer):
self.running = False self.running = False
socketserver.ThreadingTCPServer.shutdown(self) socketserver.ThreadingTCPServer.shutdown(self)
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
# the key to use for this connection # the key to use for this connection
active_key = None active_key = None
@ -324,7 +336,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
return self.request.sendall(packed_len + data) return self.request.sendall(packed_len + data)
def recv_data(self): def recv_data(self):
header_len = 8 # size of a packed unsigned long long header_len = 8 # size of a packed unsigned long long
data = "" data = ""
vvvv("in recv_data(), waiting for the header") vvvv("in recv_data(), waiting for the header")
while len(data) < header_len: while len(data) < header_len:
@ -339,9 +351,9 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
vvvv("exception received while waiting for recv(), returning None") vvvv("exception received while waiting for recv(), returning None")
return None return None
vvvv("in recv_data(), got the header, unpacking") vvvv("in recv_data(), got the header, unpacking")
data_len = struct.unpack('!Q',data[:header_len])[0] data_len = struct.unpack('!Q', data[:header_len])[0]
data = data[header_len:] data = data[header_len:]
vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) vvvv("data received so far (expecting %d): %d" % (data_len, len(data)))
while len(data) < data_len: while len(data) < data_len:
try: try:
d = self.request.recv(data_len - len(data)) d = self.request.recv(data_len - len(data))
@ -349,7 +361,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
vvv("received nothing, bailing out") vvv("received nothing, bailing out")
return None return None
data += d data += d
vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) vvvv("data received so far (expecting %d): %d" % (data_len, len(data)))
except: except:
# probably got a connection reset # probably got a connection reset
vvvv("exception received while waiting for recv(), returning None") vvvv("exception received while waiting for recv(), returning None")
@ -511,7 +523,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
response = self.active_key.Decrypt(response) response = self.active_key.Decrypt(response)
response = json.loads(response) response = json.loads(response)
if response.get('failed',False): if response.get('failed', False):
log("got a failed response from the master") log("got a failed response from the master")
return dict(failed=True, stderr="Master reported failure, aborting transfer") return dict(failed=True, stderr="Master reported failure, aborting transfer")
except Exception as e: except Exception as e:
@ -538,7 +550,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
os.makedirs(tmp_path, int('O700', 8)) os.makedirs(tmp_path, int('O700', 8))
except: except:
return dict(failed=True, msg='could not create a temporary directory at %s' % tmp_path) return dict(failed=True, msg='could not create a temporary directory at %s' % tmp_path)
(fd,out_path) = tempfile.mkstemp(prefix='ansible.', dir=tmp_path) (fd, out_path) = tempfile.mkstemp(prefix='ansible.', dir=tmp_path)
out_fd = os.fdopen(fd, 'w', 0) out_fd = os.fdopen(fd, 'w', 0)
final_path = data['out_path'] final_path = data['out_path']
else: else:
@ -546,7 +558,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
out_fd = open(out_path, 'w') out_fd = open(out_path, 'w')
try: try:
bytes=0 bytes = 0
while True: while True:
out = base64.b64decode(data['data']) out = base64.b64decode(data['data'])
bytes += len(out) bytes += len(out)
@ -575,6 +587,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
self.server.module.atomic_move(out_path, final_path) self.server.module.atomic_move(out_path, final_path)
return dict() return dict()
def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file): def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file):
try: try:
daemonize_self(module, password, port, minutes, pid_file) daemonize_self(module, password, port, minutes, pid_file)
@ -613,7 +626,7 @@ def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file):
server.allow_reuse_address = True server.allow_reuse_address = True
break break
except Exception as e: except Exception as e:
vv("Failed to create the TCP server (tries left = %d) (error: %s) " % (tries,e)) vv("Failed to create the TCP server (tries left = %d) (error: %s) " % (tries, e))
tries -= 1 tries -= 1
time.sleep(0.2) time.sleep(0.2)
@ -640,35 +653,36 @@ def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file):
log("exception caught, exiting accelerated mode: %s\n%s" % (e, tb)) log("exception caught, exiting accelerated mode: %s\n%s" % (e, tb))
sys.exit(0) sys.exit(0)
def main(): def main():
global DEBUG_LEVEL global DEBUG_LEVEL
module = AnsibleModule( module = AnsibleModule(
argument_spec = dict( argument_spec=dict(
port=dict(required=False, default=5099), port=dict(type='int', default=5099),
ipv6=dict(required=False, default=False, type='bool'), ipv6=dict(type='bool', default=False),
multi_key=dict(required=False, default=False, type='bool'), multi_key=dict(type='bool', default=False),
timeout=dict(required=False, default=300), timeout=dict(type='int', default=300),
password=dict(required=True, no_log=True), password=dict(type='str', required=True, no_log=True),
minutes=dict(required=False, default=30), minutes=dict(type='int', default=30),
debug=dict(required=False, default=0, type='int') debug=dict(type='int', default=0)
), ),
supports_check_mode=True supports_check_mode=True
) )
syslog.openlog('ansible-%s' % module._name) syslog.openlog('ansible-%s' % module._name)
password = base64.b64decode(module.params['password']) password = base64.b64decode(module.params['password'])
port = int(module.params['port']) port = int(module.params['port'])
timeout = int(module.params['timeout']) timeout = int(module.params['timeout'])
minutes = int(module.params['minutes']) minutes = int(module.params['minutes'])
debug = int(module.params['debug']) debug = int(module.params['debug'])
ipv6 = module.params['ipv6'] ipv6 = module.params['ipv6']
multi_key = module.params['multi_key'] multi_key = module.params['multi_key']
if not HAS_KEYCZAR: if not HAS_KEYCZAR:
module.fail_json(msg="keyczar is not installed (on the remote side)") module.fail_json(msg="keyczar is not installed (on the remote side)")
DEBUG_LEVEL=debug DEBUG_LEVEL = debug
pid_file = get_pid_location(module) pid_file = get_pid_location(module)
daemon_pid = None daemon_pid = None
@ -682,9 +696,7 @@ def main():
# whether other signals can be sent # whether other signals can be sent
os.kill(daemon_pid, 0) os.kill(daemon_pid, 0)
except OSError as e: except OSError as e:
message = 'the accelerate daemon appears to be running' message = 'the accelerate daemon appears to be running as a different user that this user cannot access pid=%s' % daemon_pid
message += 'as a different user that this user cannot access'
message += 'pid=%s' % daemon_pid
if e.errno == errno.EPERM: if e.errno == errno.EPERM:
# no permissions means the pid is probably # no permissions means the pid is probably
@ -726,5 +738,6 @@ def main():
# try to start up the daemon # try to start up the daemon
daemonize(module, password, port, timeout, minutes, ipv6, pid_file) daemonize(module, password, port, timeout, minutes, ipv6, pid_file)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View file

@ -51,11 +51,11 @@ def main():
module = AnsibleModule(argument_spec=dict( module = AnsibleModule(argument_spec=dict(
jid=dict(required=True), jid=dict(required=True),
mode=dict(default='status', choices=['status','cleanup']), mode=dict(default='status', choices=['status', 'cleanup']),
)) ))
mode = module.params['mode'] mode = module.params['mode']
jid = module.params['jid'] jid = module.params['jid']
# setup logging directory # setup logging directory
logdir = os.path.expanduser("~/.ansible_async") logdir = os.path.expanduser("~/.ansible_async")
@ -82,9 +82,9 @@ def main():
module.exit_json(results_file=log_path, ansible_job_id=jid, started=1, finished=0) module.exit_json(results_file=log_path, ansible_job_id=jid, started=1, finished=0)
else: else:
module.fail_json(ansible_job_id=jid, results_file=log_path, module.fail_json(ansible_job_id=jid, results_file=log_path,
msg="Could not parse job output: %s" % data, started=1, finished=1) msg="Could not parse job output: %s" % data, started=1, finished=1)
if not 'started' in data: if 'started' not in data:
data['finished'] = 1 data['finished'] = 1
data['ansible_job_id'] = jid data['ansible_job_id'] = jid
elif 'finished' not in data: elif 'finished' not in data:

View file

@ -27,9 +27,11 @@ PY3 = sys.version_info[0] == 3
syslog.openlog('ansible-%s' % os.path.basename(__file__)) syslog.openlog('ansible-%s' % os.path.basename(__file__))
syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:])) syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:]))
def notice(msg): def notice(msg):
syslog.syslog(syslog.LOG_NOTICE, msg) syslog.syslog(syslog.LOG_NOTICE, msg)
def daemonize_self(): def daemonize_self():
# daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
try: try:
@ -60,6 +62,7 @@ def daemonize_self():
os.dup2(dev_null.fileno(), sys.stdout.fileno()) os.dup2(dev_null.fileno(), sys.stdout.fileno())
os.dup2(dev_null.fileno(), sys.stderr.fileno()) os.dup2(dev_null.fileno(), sys.stderr.fileno())
# NB: this function copied from module_utils/json_utils.py. Ensure any changes are propagated there. # NB: this function copied from module_utils/json_utils.py. Ensure any changes are propagated there.
# FUTURE: AnsibleModule-ify this module so it's Ansiballz-compatible and can use the module_utils copy of this function. # FUTURE: AnsibleModule-ify this module so it's Ansiballz-compatible and can use the module_utils copy of this function.
def _filter_non_json_lines(data): def _filter_non_json_lines(data):
@ -121,7 +124,7 @@ def _run_module(wrapped_cmd, jid, job_path):
tmp_job_path = job_path + ".tmp" tmp_job_path = job_path + ".tmp"
jobfile = open(tmp_job_path, "w") jobfile = open(tmp_job_path, "w")
jobfile.write(json.dumps({ "started" : 1, "finished" : 0, "ansible_job_id" : jid })) jobfile.write(json.dumps({"started": 1, "finished": 0, "ansible_job_id": jid}))
jobfile.close() jobfile.close()
os.rename(tmp_job_path, job_path) os.rename(tmp_job_path, job_path)
jobfile = open(tmp_job_path, "w") jobfile = open(tmp_job_path, "w")
@ -163,9 +166,9 @@ def _run_module(wrapped_cmd, jid, job_path):
e = sys.exc_info()[1] e = sys.exc_info()[1]
result = { result = {
"failed": 1, "failed": 1,
"cmd" : wrapped_cmd, "cmd": wrapped_cmd,
"msg": str(e), "msg": str(e),
"outdata": outdata, # temporary notice only "outdata": outdata, # temporary notice only
"stderr": stderr "stderr": stderr
} }
result['ansible_job_id'] = jid result['ansible_job_id'] = jid
@ -173,11 +176,11 @@ def _run_module(wrapped_cmd, jid, job_path):
except (ValueError, Exception): except (ValueError, Exception):
result = { result = {
"failed" : 1, "failed": 1,
"cmd" : wrapped_cmd, "cmd": wrapped_cmd,
"data" : outdata, # temporary notice only "data": outdata, # temporary notice only
"stderr": stderr, "stderr": stderr,
"msg" : traceback.format_exc() "msg": traceback.format_exc()
} }
result['ansible_job_id'] = jid result['ansible_job_id'] = jid
jobfile.write(json.dumps(result)) jobfile.write(json.dumps(result))
@ -186,16 +189,13 @@ def _run_module(wrapped_cmd, jid, job_path):
os.rename(tmp_job_path, job_path) os.rename(tmp_job_path, job_path)
####################
## main ##
####################
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) < 5: if len(sys.argv) < 5:
print(json.dumps({ print(json.dumps({
"failed" : True, "failed": True,
"msg" : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile> [-preserve_tmp] " "msg": "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile> [-preserve_tmp] "
"Humans, do not call directly!" "Humans, do not call directly!"
})) }))
sys.exit(1) sys.exit(1)
@ -225,8 +225,8 @@ if __name__ == '__main__':
os.makedirs(jobdir) os.makedirs(jobdir)
except: except:
print(json.dumps({ print(json.dumps({
"failed" : 1, "failed": 1,
"msg" : "could not create: %s" % jobdir "msg": "could not create: %s" % jobdir
})) }))
# immediately exit this process, leaving an orphaned process # immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process # running which immediately forks a supervisory timing process
@ -241,8 +241,8 @@ if __name__ == '__main__':
# this probably could be done with some IPC later. Modules should always read # this probably could be done with some IPC later. Modules should always read
# the argsfile at the very first start of their execution anyway # the argsfile at the very first start of their execution anyway
notice("Return async_wrapper task started.") notice("Return async_wrapper task started.")
print(json.dumps({ "started" : 1, "finished" : 0, "ansible_job_id" : jid, "results_file" : job_path, print(json.dumps({"started": 1, "finished": 0, "ansible_job_id": jid, "results_file": job_path,
"_ansible_suppress_tmpdir_delete": not preserve_tmp})) "_ansible_suppress_tmpdir_delete": not preserve_tmp}))
sys.stdout.flush() sys.stdout.flush()
time.sleep(1) time.sleep(1)
sys.exit(0) sys.exit(0)
@ -263,16 +263,16 @@ if __name__ == '__main__':
# set the child process group id to kill all children # set the child process group id to kill all children
os.setpgid(sub_pid, sub_pid) os.setpgid(sub_pid, sub_pid)
notice("Start watching %s (%s)"%(sub_pid, remaining)) notice("Start watching %s (%s)" % (sub_pid, remaining))
time.sleep(step) time.sleep(step)
while os.waitpid(sub_pid, os.WNOHANG) == (0, 0): while os.waitpid(sub_pid, os.WNOHANG) == (0, 0):
notice("%s still running (%s)"%(sub_pid, remaining)) notice("%s still running (%s)" % (sub_pid, remaining))
time.sleep(step) time.sleep(step)
remaining = remaining - step remaining = remaining - step
if remaining <= 0: if remaining <= 0:
notice("Now killing %s"%(sub_pid)) notice("Now killing %s" % (sub_pid))
os.killpg(sub_pid, signal.SIGKILL) os.killpg(sub_pid, signal.SIGKILL)
notice("Sent kill to group %s"%sub_pid) notice("Sent kill to group %s " % sub_pid)
time.sleep(1) time.sleep(1)
if not preserve_tmp: if not preserve_tmp:
shutil.rmtree(os.path.dirname(wrapped_module), True) shutil.rmtree(os.path.dirname(wrapped_module), True)
@ -283,9 +283,9 @@ if __name__ == '__main__':
sys.exit(0) sys.exit(0)
else: else:
# the child process runs the actual module # the child process runs the actual module
notice("Start module (%s)"%os.getpid()) notice("Start module (%s)" % os.getpid())
_run_module(cmd, jid, job_path) _run_module(cmd, jid, job_path)
notice("Module complete (%s)"%os.getpid()) notice("Module complete (%s)" % os.getpid())
sys.exit(0) sys.exit(0)
except SystemExit: except SystemExit:
@ -295,9 +295,9 @@ if __name__ == '__main__':
except Exception: except Exception:
e = sys.exc_info()[1] e = sys.exc_info()[1]
notice("error: %s"%(e)) notice("error: %s" % e)
print(json.dumps({ print(json.dumps({
"failed" : True, "failed": True,
"msg" : "FATAL ERROR: %s" % str(e) "msg": "FATAL ERROR: %s" % e
})) }))
sys.exit(1) sys.exit(1)

View file

@ -483,7 +483,4 @@ lib/ansible/modules/system/systemd.py
lib/ansible/modules/system/timezone.py lib/ansible/modules/system/timezone.py
lib/ansible/modules/system/ufw.py lib/ansible/modules/system/ufw.py
lib/ansible/modules/system/user.py lib/ansible/modules/system/user.py
lib/ansible/modules/utilities/helper/_accelerate.py
lib/ansible/modules/utilities/logic/async_status.py
lib/ansible/modules/utilities/logic/async_wrapper.py
lib/ansible/playbook/base.py lib/ansible/playbook/base.py