From 6d580aea029f19d77cb5d97e0186e0f8beeaed35 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Fri, 25 May 2012 18:44:29 -0400 Subject: [PATCH] As part of a precursor to other refactoring, make returns less list-like throughout runner. --- lib/ansible/callbacks.py | 2 + lib/ansible/runner.py | 273 +++++++++++++++++++++------------------ 2 files changed, 150 insertions(+), 125 deletions(-) diff --git a/lib/ansible/callbacks.py b/lib/ansible/callbacks.py index 256ceb5586e..37c6666062a 100644 --- a/lib/ansible/callbacks.py +++ b/lib/ansible/callbacks.py @@ -128,6 +128,8 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks): self._on_any(host,res) def on_unreachable(self, host, res): + if type(res) == dict: + res = res.get('msg','') print "%s | FAILED => %s" % (host, res) if self.options.tree: utils.write_tree_file(self.options.tree, host, utils.bigjson(dict(failed=True, msg=res))) diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index 9e903871e00..5e86c711f0f 100644 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -68,6 +68,37 @@ def _executor_hook(job_queue, result_queue): ################################################ +class ReturnData(object): + + __slots__ = [ 'result', 'comm_ok', 'executed_str', 'host' ] + + def __init__(self, host=None, result=None, comm_ok=True, executed_str=''): + self.host = host + self.result = result + self.comm_ok = comm_ok + self.executed_str = executed_str + + if type(self.result) in [ str, unicode ]: + self.result = utils.parse_json(self.result) + + if host is None: + raise Exception("host not set") + if type(self.result) != dict: + raise Exception("dictionary result expected") + + def communicated_ok(self): + return self.comm_ok + + def is_successful(self): + if not self.comm_ok: + return False + else: + if 'failed' in self.result: + return False + if self.result.get('rc',0) != 0: + return False + return True + class Runner(object): def __init__(self, @@ -103,7 +134,7 @@ class Runner(object): conditional : only execute if this string, evaluated, is True callbacks : output callback class sudo : log in as remote user and immediately sudo to root - module_vars : provides additional variables to a template. FIXME: just use module_args, remove + module_vars : provides additional variables to a template. FIXME: factor this out is_playbook : indicates Runner is being used by a playbook. affects behavior in various ways. inventory : inventory object, if host_list is not provided """ @@ -162,21 +193,6 @@ class Runner(object): # ***************************************************** - def _return_from_module(self, conn, host, result, err, executed=None): - ''' helper function to handle JSON parsing of results ''' - - try: - result = utils.parse_json(result) - if executed is not None: - result['invocation'] = executed - if 'stderr' in result: - err="%s%s"%(err,result['stderr']) - return [host, True, result, err] - except Exception, e: - return [host, False, "%s/%s/%s" % (str(e), result, executed), err] - - # ***************************************************** - def _delete_remote_files(self, conn, files): ''' deletes one or more remote files ''' @@ -185,7 +201,7 @@ class Runner(object): for filename in files: if filename.find('/tmp/') == -1: raise Exception("not going to happen") - self._exec_command(conn, "rm -rf %s" % filename, None) + self._low_level_exec_command(conn, "rm -rf %s" % filename, None) # ***************************************************** @@ -193,7 +209,7 @@ class Runner(object): ''' transfers a module file to the remote side to execute it, but does not execute it yet ''' outpath = self._copy_module(conn, tmp, module) - self._exec_command(conn, "chmod +x %s" % outpath, tmp) + self._low_level_exec_command(conn, "chmod +x %s" % outpath, tmp) return outpath # ***************************************************** @@ -263,6 +279,7 @@ class Runner(object): def _execute_module(self, conn, tmp, remote_module_path, args, async_jid=None, async_module=None, async_limit=None): + ''' runs a module that has already been transferred ''' inject = self.setup_cache.get(conn.host,{}).copy() @@ -272,7 +289,8 @@ class Runner(object): conditional = utils.double_template(self.conditional, inject, self.setup_cache) if not eval(conditional): - return [ utils.smjson(dict(skipped=True)), None, 'skipped' ] + result = utils.smjson(dict(skipped=True)) + return ReturnData(host=conn.host, result=result) if self.module_name == 'setup': if not args: @@ -292,9 +310,12 @@ class Runner(object): else: cmd = " ".join([str(x) for x in [remote_module_path, async_jid, async_limit, async_module, argsfile]]) - res, err = self._exec_command(conn, cmd, tmp, sudoable=True) - client_executed_str = "%s %s" % (module_name_tail, args.strip()) - return ( res, err, client_executed_str ) + res = self._low_level_exec_command(conn, cmd, tmp, sudoable=True) + result1 = utils.parse_json(res) + + executed_str = "%s %s" % (module_name_tail, args.strip()) + + return ReturnData(host=conn.host, result=res, executed_str=executed_str) # ***************************************************** @@ -336,18 +357,15 @@ class Runner(object): # ***************************************************** - def _execute_raw(self, conn, host, tmp): + def _execute_raw(self, conn, tmp): ''' execute a non-module command for bootstrapping, or if there's no python on a device ''' - stdout, stderr = self._exec_command( conn, self.module_args, tmp, sudoable = True ) + stdout = self._low_level_exec_command( conn, self.module_args, tmp, sudoable = True ) data = dict(stdout=stdout) - if stderr: - data['stderr'] = stderr - return (host, True, data, '') + return ReturnData(host=conn.host, results=data) # *************************************************** - - def _execute_normal_module(self, conn, host, tmp, module_name): + def _execute_normal_module(self, conn, tmp, module_name): ''' transfer & execute a module that is not 'copy' or 'template' ''' # shell and command are the same module @@ -356,22 +374,17 @@ class Runner(object): self.module_args += " #USE_SHELL" module = self._transfer_module(conn, tmp, module_name) - (result, err, executed) = self._execute_module(conn, tmp, module, self.module_args) - - (host, ok, data, err) = self._return_from_module(conn, host, result, err, executed) - - if ok: - self._add_result_to_setup_cache(conn, data) - - return (host, ok, data, err) + exec_rc = self._execute_module(conn, tmp, module, self.module_args) + if exec_rc.is_successful(): + self._add_result_to_setup_cache(conn, exec_rc.result) + return exec_rc # ***************************************************** - def _execute_async_module(self, conn, host, tmp, module_name): + def _execute_async_module(self, conn, tmp, module_name): ''' transfer the given module name, plus the async module, then run it ''' - # hack to make the 'shell' module keyword really be executed - # by the command module + # shell and command module are the same module_args = self.module_args if module_name == 'shell': module_name = 'command' @@ -379,17 +392,16 @@ class Runner(object): async = self._transfer_module(conn, tmp, 'async_wrapper') module = self._transfer_module(conn, tmp, module_name) - (result, err, executed) = self._execute_module(conn, tmp, async, module_args, + + return self._execute_module(conn, tmp, async, module_args, async_module=module, async_jid=self.generated_jid, async_limit=self.background ) - return self._return_from_module(conn, host, result, err, executed) - # ***************************************************** - def _execute_copy(self, conn, host, tmp): + def _execute_copy(self, conn, tmp): ''' handler for file transfer operations ''' # load up options @@ -397,12 +409,12 @@ class Runner(object): source = options.get('src', None) dest = options.get('dest', None) if (source is None and not 'first_available_file' in self.module_vars) or dest is None: - return (host, True, dict(failed=True, msg="src and dest are required"), '') + result=dict(failed=True, msg="src and dest are required") + return ReturnData(host=conn.host, result=result) # apply templating to source argument inject = self.setup_cache.get(conn.host,{}) - # FIXME: break duplicate code up into subfunction # if we have first_available_file in our vars # look up the files and use the first one we find as src if 'first_available_file' in self.module_vars: @@ -414,7 +426,8 @@ class Runner(object): found = True break if not found: - return (host, True, dict(failed=True, msg="could not find src in first_available_file list"), '') + results=dict(failed=True, msg="could not find src in first_available_file list") + return ReturnData(host=conn.host, is_error=True, results=results) source = utils.template(source, inject, self.setup_cache) @@ -428,17 +441,16 @@ class Runner(object): # run the copy module args = "src=%s dest=%s" % (tmp_src, dest) - (result1, err, executed) = self._execute_module(conn, tmp, module, args) - (host, ok, data, err) = self._return_from_module(conn, host, result1, err, executed) + exec_rc = self._execute_module(conn, tmp, module, args) - if ok: - return self._chain_file_module(conn, tmp, data, err, options, executed) + if exec_rc.is_successful(): + return self._chain_file_module(conn, tmp, exec_rc, options) else: - return (host, ok, data, err) + return exec_rc # ***************************************************** - def _execute_fetch(self, conn, host, tmp): + def _execute_fetch(self, conn, tmp): ''' handler for fetch operations ''' # load up options @@ -446,7 +458,8 @@ class Runner(object): source = options.get('src', None) dest = options.get('dest', None) if source is None or dest is None: - return (host, True, dict(failed=True, msg="src and dest are required"), '') + results = dict(failed=True, msg="src and dest are required") + return ReturnData(host=conn.host, error=True, results=results) # apply templating to source argument inject = self.setup_cache.get(conn.host,{}) @@ -456,14 +469,14 @@ class Runner(object): dest = utils.template(dest, inject, self.setup_cache) # files are saved in dest dir, with a subdir for each host, then the filename - dest = "%s/%s/%s" % (utils.path_dwim(self.basedir, dest), host, source) + dest = "%s/%s/%s" % (utils.path_dwim(self.basedir, dest), conn.host, source) dest = dest.replace("//","/") # compare old and new md5 for support of change hooks local_md5 = None if os.path.exists(dest): local_md5 = os.popen("md5sum %s" % dest).read().split()[0] - remote_md5 = self._exec_command(conn, "md5sum %s" % source, tmp, True)[0].split()[0] + remote_md5 = self._low_level_exec_command(conn, "md5sum %s" % source, tmp, True)[0].split()[0] if remote_md5 != local_md5: # create the containing directories, if needed @@ -473,35 +486,39 @@ class Runner(object): conn.fetch_file(source, dest) new_md5 = os.popen("md5sum %s" % dest).read().split()[0] if new_md5 != remote_md5: - return (host, True, dict(failed=True, msg="md5 mismatch", md5sum=new_md5), '') - return (host, True, dict(changed=True, md5sum=new_md5), '') + result = dict(failed=True, msg="md5 mismatch", md5sum=new_md5) + return ReturnData(host=conn.host, result=result) + result = dict(changed=True, md5sum=new_md5) + return ReturnData(host=conn.host, result=result) else: - return (host, True, dict(changed=False, md5sum=local_md5), '') + result = dict(changed=False, md5sum=local_md5) + return ReturnData(host=conn.host, result=result) # ***************************************************** - def _chain_file_module(self, conn, tmp, data, err, options, executed): + def _chain_file_module(self, conn, tmp, exec_rc, options): + ''' handles changing file attribs after copy/template operations ''' - old_changed = data.get('changed', False) + old_changed = exec_rc.result.get('changed', False) module = self._transfer_module(conn, tmp, 'file') args = ' '.join([ "%s=%s" % (k,v) for (k,v) in options.items() ]) - (result2, err2, executed2) = self._execute_module(conn, tmp, module, args) - results2 = self._return_from_module(conn, conn.host, result2, err2, executed) - (host, ok, data2, err2) = results2 - if ok: - new_changed = data2.get('changed', False) - data.update(data2) - else: - new_changed = False + exec_rc2 = self._execute_module(conn, tmp, module, args) + + new_changed = False + if exec_rc2.is_successful(): + new_changed = exec_rc2.result.get('changed', False) + exec_rc.result.update(exec_rc2.result) + if old_changed or new_changed: - data['changed'] = True - return (host, ok, data, "%s%s"%(err,err2)) + exec_rc.result['changed'] = True + + return exec_rc # ***************************************************** - def _execute_template(self, conn, host, tmp): + def _execute_template(self, conn, tmp): ''' handler for template operations ''' # load up options @@ -510,7 +527,8 @@ class Runner(object): dest = options.get('dest', None) metadata = options.get('metadata', None) if (source is None and 'first_available_file' not in self.module_vars) or dest is None: - return (host, True, dict(failed=True, msg="src and dest are required"), '') + result = dict(failed=True, msg="src and dest are required") + return ReturnData(host=conn.host, comm_ok=False, result=result) # apply templating to source argument so vars can be used in the path inject = self.setup_cache.get(conn.host,{}) @@ -526,11 +544,13 @@ class Runner(object): found = True break if not found: - return (host, True, dict(failed=True, msg="could not find src in first_available_file list"), '') + result = dict(failed=True, msg="could not find src in first_available_file list") + return ReturnData(host=conn.host, comm_ok=False, result=result) + source = utils.template(source, inject, self.setup_cache) - (host, ok, data, err) = (None, None, None, None) + #(host, ok, data, err) = (None, None, None, None) if not self.is_playbook: @@ -548,14 +568,14 @@ class Runner(object): # run the slurp module to get the metadata file args = "src=%s" % metadata - (result1, err, executed) = self._execute_module(conn, tmp, slurp_module, args) - result1 = utils.json_loads(result1) - if not 'content' in result1 or result1.get('encoding','base64') != 'base64': - result1['failed'] = True - return self._return_from_module(conn, host, result1, err, executed) - content = base64.b64decode(result1['content']) + result1 = self._execute_module(conn, tmp, slurp_module, args) + if not 'content' in result1.result or result1.result.get('encoding','base64') != 'base64': + result1.result['failed'] = True + return result1 + content = base64.b64decode(result1.result['content']) inject = utils.json_loads(content) + # install the template module copy_module = self._transfer_module(conn, tmp, 'copy') @@ -564,37 +584,40 @@ class Runner(object): resultant = utils.template_from_file(utils.path_dwim(self.basedir, source), inject, self.setup_cache, no_engine=False) except Exception, e: - return (host, False, dict(failed=True, msg=str(e)), '') + result = dict(failed=True, msg=str(e)) + return ReturnData(host=conn.host, comm_ok=False, result=result) + xfered = self._transfer_str(conn, tmp, 'source', resultant) # run the COPY module args = "src=%s dest=%s" % (xfered, dest) - (result1, err, executed) = self._execute_module(conn, tmp, copy_module, args) - (host, ok, data, err) = self._return_from_module(conn, host, result1, err, executed) + exec_rc = self._execute_module(conn, tmp, copy_module, args) # modify file attribs if needed - if ok: - executed = executed.replace("copy","template",1) - return self._chain_file_module(conn, tmp, data, err, options, executed) + if exec_rc.comm_ok: + exec_rc.executed_str = exec_rc.executed_str.replace("copy","template",1) + return self._chain_file_module(conn, tmp, exec_rc, options) else: - return (host, ok, data, err) + return exec_rc # ***************************************************** def _executor(self, host): try: - (host, ok, data, err) = self._executor_internal(host) - if not ok: - self.callbacks.on_unreachable(host, data) - return (host, ok, data) + exec_rc = self._executor_internal(host) + if type(exec_rc) != ReturnData: + raise Exception("unexpected return type: %s" % type(exec_rc)) + if not exec_rc.comm_ok: + self.callbacks.on_unreachable(host, exec_rc.result) + return exec_rc except errors.AnsibleError, ae: msg = str(ae) self.callbacks.on_unreachable(host, msg) - return [host, False, msg] + return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg)) except Exception: msg = traceback.format_exc() self.callbacks.on_unreachable(host, msg) - return [host, False, msg] + return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg)) def _executor_internal(self, host): ''' callback executed in parallel for each host. returns (hostname, connected_ok, extra) ''' @@ -606,7 +629,8 @@ class Runner(object): try: conn = self.connector.connect(host, port) except errors.AnsibleConnectionFailed, e: - return [ host, False, "FAILED: %s" % str(e), None ] + result = dict(failed=True, msg="FAILED: %s" % str(e)) + return ReturnData(host=host, comm_ok=False, result=result) cache = self.setup_cache.get(host, {}) module_name = utils.template(self.module_name, cache, self.setup_cache) @@ -615,56 +639,53 @@ class Runner(object): result = None if self.module_name == 'copy': - result = self._execute_copy(conn, host, tmp) + result = self._execute_copy(conn, tmp) elif self.module_name == 'fetch': - result = self._execute_fetch(conn, host, tmp) + result = self._execute_fetch(conn, tmp) elif self.module_name == 'template': - result = self._execute_template(conn, host, tmp) + result = self._execute_template(conn, tmp) elif self.module_name == 'raw': - result = self._execute_raw(conn, host, tmp) + result = self._execute_raw(conn, tmp) else: if self.background == 0: - result = self._execute_normal_module(conn, host, tmp, module_name) + result = self._execute_normal_module(conn, tmp, module_name) else: - result = self._execute_async_module(conn, host, tmp, module_name) + result = self._execute_async_module(conn, tmp, module_name) self._delete_remote_files(conn, tmp) conn.close() - (host, connect_ok, data, err) = result - if not connect_ok: + if not result.comm_ok: + # connection or parsing errors... self.callbacks.on_unreachable(host, data) else: - if 'failed' in data or 'rc' in data and str(data['rc']) != '0': - self.callbacks.on_failed(host, data) - elif 'skipped' in data: - self.callbacks.on_skipped(host) + data = result.result + if 'skipped' in data: + self.callbacks.on_skipped(result.host) + elif not result.is_successful(): + self.callbacks.on_failed(result.host, result.result) else: - self.callbacks.on_ok(host, data) - - if err: - if self.debug or data.get('parsed', True) == False: - self.callbacks.on_error(host, err) + self.callbacks.on_ok(result.host, result.result) return result # ***************************************************** - def _exec_command(self, conn, cmd, tmp, sudoable=False): + def _low_level_exec_command(self, conn, cmd, tmp, sudoable=False): ''' execute a command string over SSH, return the output ''' sudo_user = self.sudo_user stdin, stdout, stderr = conn.exec_command(cmd, tmp, sudo_user, sudoable=sudoable) - err=None out=None - if type(stderr) != str: - err="\n".join(stderr.readlines()) - else: - err=stderr + if type(stdout) != str: out="\n".join(stdout.readlines()) else: out=stdout - return (out,err) + + + + # sudo mode paramiko doesn't capture stderr, so not relaying here either... + return out # ***************************************************** @@ -679,11 +700,11 @@ class Runner(object): if self.remote_user != 'root': cmd = "mkdir -p %s && %s" % (basetmp, cmd) - result, err = self._exec_command(conn, cmd, None, sudoable=False) + result = self._low_level_exec_command(conn, cmd, None, sudoable=False) cleaned = result.split("\n")[0].strip() + '/' if self.remote_user != 'root': cmd = 'chmod a+x %s' % cleaned - result, err = self._exec_command(conn, cmd, None, sudoable=False) + self._low_level_exec_command(conn, cmd, None, sudoable=False) return cleaned @@ -743,11 +764,13 @@ class Runner(object): return None for result in results: - (host, contacted_ok, result) = result - if contacted_ok: - results2["contacted"][host] = result + host = result.host + if host is None: + raise Exception("internal error, host not set") + if result.communicated_ok(): + results2["contacted"][host] = result.result else: - results2["dark"][host] = result + results2["dark"][host] = result.result # hosts which were contacted but never got a chance to return for host in self.inventory.list_hosts(self.pattern):