Reset workers list when recreating processes

Also renames some things to be more accurate
This commit is contained in:
James Cammarata 2015-11-06 12:03:20 -05:00
parent 1655b08c77
commit 59b67a2f68

View file

@ -93,9 +93,9 @@ class TaskQueueManager:
# plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile()
def _initialize_processes(self, num):
self._workers = []
def _initialize_workers(self, num):
for i in xrange(num):
main_q = multiprocessing.Queue()
rslt_q = multiprocessing.Queue()
@ -180,7 +180,7 @@ class TaskQueueManager:
# Fork # of forks, # of hosts or serial, whichever is lowest
contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))]
contenders = [ v for v in contenders if v is not None and v > 0 ]
self._initialize_workers(min( contenders ))
self._initialize_processes(min(contenders))
if not self._callbacks_loaded:
self.load_callbacks()
@ -224,16 +224,16 @@ class TaskQueueManager:
# and run the play using the strategy and cleanup on way out
play_return = strategy.run(iterator, play_context)
self._cleanup_workers()
self._cleanup_processes()
return play_return
def cleanup(self):
self._display.debug("RUNNING CLEANUP")
self.terminate()
self._final_q.close()
self._cleanup_workers()
self._cleanup_processes()
def _cleanup_workers(self):
def _cleanup_processes(self):
if self._result_prc:
self._result_prc.terminate()