From 52a42bf6075da0bc3064f700cdd8bb14a2144416 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Mon, 30 Sep 2013 14:08:07 -0500 Subject: [PATCH] Add more verbose debugging options for accelerate --- .../runner/connection_plugins/accelerate.py | 13 +++- library/utilities/accelerate | 65 ++++++++++++++----- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index 61400d0ab3c..d6ce7243f1b 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -21,7 +21,7 @@ import base64 import socket import struct import time -from ansible.callbacks import vvv +from ansible.callbacks import vvv, vvvv from ansible.runner.connection_plugins.ssh import Connection as SSHConnection from ansible.runner.connection_plugins.paramiko_ssh import Connection as ParamikoConnection from ansible import utils @@ -84,12 +84,13 @@ class Connection(object): utils.AES_KEYS = self.runner.aes_keys def _execute_accelerate_module(self): - args = "password=%s port=%s" % (base64.b64encode(self.key.__str__()), str(self.accport)) + args = "password=%s port=%s debug=%d" % (base64.b64encode(self.key.__str__()), str(self.accport), int(utils.VERBOSITY)) inject = dict(password=self.key) if self.runner.accelerate_inventory_host: inject = utils.combine_vars(inject, self.runner.inventory.get_variables(self.runner.accelerate_inventory_host)) else: inject = utils.combine_vars(inject, self.runner.inventory.get_variables(self.host)) + vvvv("attempting to start up the accelerate daemon...") self.ssh.connect() tmp_path = self.runner._make_tmp_path(self.ssh) return self.runner._execute_module(self.ssh, tmp_path, 'accelerate', args, inject=inject) @@ -103,11 +104,13 @@ class Connection(object): tries = 3 self.conn = socket.socket() self.conn.settimeout(300.0) + vvvv("attempting connection to %s via the accelerated port %d" % (self.host,self.accport)) while tries > 0: try: self.conn.connect((self.host,self.accport)) break except: + vvvv("failed, retrying...") time.sleep(0.1) tries -= 1 if tries == 0: @@ -133,18 +136,24 @@ class Connection(object): header_len = 8 # size of a packed unsigned long long data = b"" try: + vvvv("%s: in recv_data(), waiting for the header" % self.host) while len(data) < header_len: d = self.conn.recv(1024) if not d: + vvvv("%s: received nothing, bailing out" % self.host) return None data += d + vvvv("%s: got the header, unpacking" % self.host) data_len = struct.unpack('Q',data[:header_len])[0] data = data[header_len:] + vvvv("%s: data received so far (expecting %d): %d" % (self.host,data_len,len(data))) while len(data) < data_len: d = self.conn.recv(1024) if not d: + vvvv("%s: received nothing, bailing out" % self.host) return None data += d + vvvv("%s: received all of the data, returning" % self.host) return data except socket.timeout: raise errors.AnsibleError("timed out while waiting to receive data") diff --git a/library/utilities/accelerate b/library/utilities/accelerate index 371e4adb92b..da16d8395e8 100644 --- a/library/utilities/accelerate +++ b/library/utilities/accelerate @@ -85,8 +85,22 @@ PIDFILE = os.path.expanduser("~/.accelerate.pid") # which leaves room for the TCP/IP header CHUNK_SIZE=10240 -def log(msg): - syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg) +# FIXME: this all should be moved to module_common, as it's +# pretty much a copy from the callbacks/util code +DEBUG_LEVEL=0 +def log(msg, cap=0): + global DEBUG_LEVEL + if cap >= DEBUG_LEVEL: + syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg) + +def vv(msg): + log(msg, cap=2) + +def vvv(msg): + log(msg, cap=3) + +def vvvv(msg): + log(msg, cap=4) if os.path.exists(PIDFILE): try: @@ -114,7 +128,7 @@ def daemonize_self(module, password, port, minutes): try: pid = os.fork() if pid > 0: - log("exiting pid %s" % pid) + vvv("exiting pid %s" % pid) # exit first parent module.exit_json(msg="daemonized accelerate on port %s for %s minutes" % (port, minutes)) except OSError, e: @@ -134,7 +148,7 @@ def daemonize_self(module, password, port, minutes): pid_file = open(PIDFILE, "w") pid_file.write("%s" % pid) pid_file.close() - log("pidfile written") + vvv("pidfile written") sys.exit(0) except OSError, e: log("fork #2 failed: %d (%s)" % (e.errno, e.strerror)) @@ -162,52 +176,64 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): def recv_data(self): header_len = 8 # size of a packed unsigned long long data = b"" + vvvv("in recv_data(), waiting for the header") while len(data) < header_len: d = self.request.recv(1024) if not d: + vvv("received nothing, bailing out") return None data += d + vvvv("in recv_data(), got the header, unpacking") data_len = struct.unpack('Q',data[:header_len])[0] data = data[header_len:] + vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) while len(data) < data_len: d = self.request.recv(1024) if not d: + vvv("received nothing, bailing out") return None data += d + vvvv("received all of the data, returning") return data def handle(self): while True: - #log("waiting for data") + vvvv("waiting for data") data = self.recv_data() if not data: + vvvv("received nothing back from recv_data(), breaking out") break try: - #log("got data, decrypting") + vvvv("got data, decrypting") data = self.server.key.Decrypt(data) - #log("decryption done") + vvvv("decryption done") except: - log("bad decrypt, skipping...") + vv("bad decrypt, skipping...") data2 = json.dumps(dict(rc=1)) data2 = self.server.key.Encrypt(data2) send_data(client, data2) return - #log("loading json from the data") + vvvv("loading json from the data") data = json.loads(data) mode = data['mode'] response = {} if mode == 'command': + vvvv("received a command request, running it") response = self.command(data) elif mode == 'put': + vvvv("received a put request, putting it") response = self.put(data) elif mode == 'fetch': + vvvv("received a fetch request, getting it") response = self.fetch(data) data2 = json.dumps(response) data2 = self.server.key.Encrypt(data2) + vvvv("sending the response back to the controller") self.send_data(data2) + vvvv("done sending the response") def command(self, data): if 'cmd' not in data: @@ -217,14 +243,14 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): if 'executable' not in data: return dict(failed=True, msg='internal error: executable is required') - #log("executing: %s" % data['cmd']) + vvvv("executing: %s" % data['cmd']) rc, stdout, stderr = self.server.module.run_command(data['cmd'], executable=data['executable'], close_fds=True) if stdout is None: stdout = '' if stderr is None: stderr = '' - #log("got stdout: %s" % stdout) - #log("got stderr: %s" % stderr) + vvvv("got stdout: %s" % stdout) + vvvv("got stderr: %s" % stderr) return dict(rc=rc, stdout=stdout, stderr=stderr) @@ -235,7 +261,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): try: fd = file(data['in_path'], 'rb') fstat = os.stat(data['in_path']) - log("FETCH file is %d bytes" % fstat.st_size) + vvv("FETCH file is %d bytes" % fstat.st_size) while fd.tell() < fstat.st_size: data = fd.read(CHUNK_SIZE) last = False @@ -276,7 +302,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): final_path = None final_user = None if 'user' in data and data.get('user') != getpass.getuser(): - log("the target user doesn't match this user, we'll move the file into place via sudo") + vv("the target user doesn't match this user, we'll move the file into place via sudo") (fd,out_path) = tempfile.mkstemp(prefix='ansible.', dir=os.path.expanduser('~/.ansible/tmp/')) out_fd = os.fdopen(fd, 'w', 0) final_path = data['out_path'] @@ -306,11 +332,11 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): log("failed to put the file: %s" % tb) return dict(failed=True, stdout="Could not write the file") finally: - #log("wrote %d bytes" % bytes) + vvvv("wrote %d bytes" % bytes) out_fd.close() if final_path: - log("moving %s to %s" % (out_path, final_path)) + vvv("moving %s to %s" % (out_path, final_path)) args = ['sudo','cp',out_path,final_path] rc, stdout, stderr = self.server.module.run_command(args, close_fds=True) if rc != 0: @@ -334,7 +360,7 @@ def daemonize(module, password, port, minutes): server = ThreadedTCPServer(("0.0.0.0", port), ThreadedTCPRequestHandler, module, password) server.allow_reuse_address = True - log("serving!") + vv("serving!") server.serve_forever(poll_interval=1.0) except Exception, e: tb = traceback.format_exc() @@ -342,11 +368,13 @@ def daemonize(module, password, port, minutes): sys.exit(0) def main(): + global DEBUG_LEVEL module = AnsibleModule( argument_spec = dict( port=dict(required=False, default=5099), password=dict(required=True), minutes=dict(required=False, default=30), + debug=dict(required=False, default=0, type='int') ), supports_check_mode=True ) @@ -354,10 +382,13 @@ def main(): password = base64.b64decode(module.params['password']) port = int(module.params['port']) minutes = int(module.params['minutes']) + debug = int(module.params['debug']) if not HAS_KEYCZAR: module.fail_json(msg="keyczar is not installed") + DEBUG_LEVEL=debug + daemonize(module, password, port, minutes)