Merge pull request #38 from skvidal/master

two fixes here
This commit is contained in:
Michael DeHaan 2012-02-27 05:13:09 -08:00
commit 8fc69d3055

View file

@ -21,16 +21,23 @@
import fnmatch import fnmatch
import multiprocessing import multiprocessing
import signal
import os import os
import json import json
import traceback import traceback
import paramiko # non-core dependency import paramiko # non-core dependency
import ansible.constants as C import ansible.constants as C
def _executor_hook(x): def _executor_hook(job_queue, result_queue):
''' callback used by multiprocessing pool ''' ''' callback used by multiprocessing pool '''
(runner, host) = x signal.signal(signal.SIGINT, signal.SIG_IGN)
return runner._executor(host) while not job_queue.empty():
try:
job = job_queue.get(block=False)
runner, host = job
result_queue.put(runner._executor(host))
except Queue.Empty:
pass
class Runner(object): class Runner(object):
@ -108,6 +115,8 @@ class Runner(object):
ssh.connect(host, username=self.remote_user, allow_agent=True, ssh.connect(host, username=self.remote_user, allow_agent=True,
look_for_keys=True, password=self.remote_pass) look_for_keys=True, password=self.remote_pass)
return [ True, ssh ] return [ True, ssh ]
except paramiko.AuthenticationException, e:
return [ False, str(e) ]
except: except:
# it failed somehow, return the failure string # it failed somehow, return the failure string
return [ False, traceback.format_exc() ] return [ False, traceback.format_exc() ]
@ -294,10 +303,34 @@ class Runner(object):
# _executor_hook does all of the work # _executor_hook does all of the work
hosts = [ (self,x) for x in hosts ] hosts = [ (self,x) for x in hosts ]
if self.forks > 1: if self.forks > 1:
pool = multiprocessing.Pool(self.forks) job_queue = multiprocessing.Queue()
results = pool.map(_executor_hook, hosts) result_queue = multiprocessing.Queue()
for i in hosts:
job_queue.put(i)
workers = []
for i in range(self.forks):
tmp = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue))
tmp.start()
workers.append(tmp)
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
print 'parent received ctrl-c'
for worker in workers:
worker.terminate()
worker.join()
results = []
while not result_queue.empty():
results.append(result_queue.get(block=False))
else: else:
results = [ _executor_hook(x) for x in hosts ] results = [ x._executor(h) for (x,h) in hosts ]
# sort hosts by ones we successfully contacted # sort hosts by ones we successfully contacted
# and ones we did not so that we can return a # and ones we did not so that we can return a
@ -307,12 +340,21 @@ class Runner(object):
"contacted" : {}, "contacted" : {},
"dark" : {} "dark" : {}
} }
hosts_with_results = []
for x in results: for x in results:
(host, is_ok, result) = x (host, is_ok, result) = x
hosts_with_results.append(host)
if not is_ok: if not is_ok:
results2["dark"][host] = result results2["dark"][host] = result
else: else:
results2["contacted"][host] = result results2["contacted"][host] = result
# hosts which were contacted but never got a chance
# to return a result before we exited/ctrl-c'd
# perhaps these shouldn't be 'dark' but I'm not sure if they fit
# anywhere else.
for host in self.match_hosts(self.pattern):
if host not in hosts_with_results:
results2["dark"][host] = {}
return results2 return results2