Compare commits

...

5 commits

Author SHA1 Message Date
James Cammarata 9b6b924e9c Use events instead of sleep for result reading 2017-10-02 12:25:00 -05:00
James Cammarata f860ea10c4 Updating locking in plugins after rebasing 2017-10-02 12:25:00 -05:00
James Cammarata dbd3a0828a Use threading instead of concurrent.futures for py2 and py3 2017-10-02 12:25:00 -05:00
James Cammarata 5e63647517 Use persistent threads with job/results queues
Instead of creating a one-time use thread for every host/task
2017-10-02 12:25:00 -05:00
James Cammarata f95160723d Start of work to use threading instead of forking
Rather than using multiprocessing (Process and Queue objects) to do worker
tasks in Ansible:

* Using concurrent.futures
* Using ThreadProcessExecutor
* Making PluginLoader thread-safe
* Gutting a lot of code dealing with message passing
2017-10-02 12:25:00 -05:00
5 changed files with 228 additions and 127 deletions

View file

@ -0,0 +1,116 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import sys
import time
import traceback
from jinja2.exceptions import TemplateNotFound
from ansible.errors import AnsibleConnectionFailure
from ansible.executor.task_executor import TaskExecutor
from ansible.executor.task_result import TaskResult
from ansible.module_utils._text import to_text
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
__all__ = ['WorkerProcess']
def run_worker(tqm, shared_loader_obj):
'''
The worker thread class, which uses TaskExecutor to run tasks
read from a job queue and pushes results into a results queue
for reading later.
'''
# import cProfile, pstats, StringIO
# pr = cProfile.Profile()
# pr.enable()
display.debug("STARTING WORKER")
while not tqm._terminated:
job = tqm.get_job()
if job is None:
time.sleep(0.0001)
continue
display.debug("WORKER GOT A JOB")
(host, task, play_context, task_vars) = job
try:
# execute the task and build a TaskResult from the result
display.debug("running TaskExecutor() for %s/%s" % (host, task))
executor_result = TaskExecutor(
host,
task,
task_vars,
play_context,
None, #new_stdin
tqm._loader,
shared_loader_obj,
tqm, #rslt_q
).run()
display.debug("done running TaskExecutor() for %s/%s" % (host, task))
# put the result on the result queue
display.debug("sending task result")
tqm.put_result(TaskResult(
host,
task,
executor_result,
))
display.debug("done task result")
except AnsibleConnectionFailure:
tqm.put_result(TaskResult(
host,
task,
dict(unreachable=True),
))
except Exception as e:
if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
try:
tqm.put_result(TaskResult(
host,
task,
dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
))
except:
display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))
# pr.disable()
# s = StringIO.StringIO()
# sortby = 'time'
# ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
# ps.print_stats()
# with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue())
display.debug("WORKER PROCESS EXITING")

View file

@ -299,14 +299,13 @@ class TaskExecutor:
templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars) templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars)
res['_ansible_item_label'] = templar.template(label) res['_ansible_item_label'] = templar.template(label)
self._rslt_q.put( self._rslt_q.put_result(
TaskResult( TaskResult(
self._host.name, self._host,
self._task._uuid, self._task,
res, res,
task_fields=self._task.dump_attrs(), task_fields=self._task.dump_attrs(),
), ),
block=False,
) )
results.append(res) results.append(res)
del task_vars[loop_var] del task_vars[loop_var]
@ -591,7 +590,7 @@ class TaskExecutor:
result['_ansible_retry'] = True result['_ansible_retry'] = True
result['retries'] = retries result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False) self._rslt_q.put_result(TaskResult(self._host, self._task, result, task_fields=self._task.dump_attrs()))
time.sleep(delay) time.sleep(delay)
else: else:
if retries > 1: if retries > 1:

View file

@ -19,19 +19,22 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import multiprocessing import threading
import os import os
import tempfile import tempfile
from collections import deque
from ansible import constants as C from ansible import constants as C
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.threading import run_worker
from ansible.executor.stats import AggregateStats from ansible.executor.stats import AggregateStats
from ansible.module_utils.six import string_types from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_text from ansible.module_utils._text import to_text
from ansible.playbook.block import Block from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import callback_loader, strategy_loader, module_loader from ansible.plugins.loader import action_loader, connection_loader, filter_loader, test_loader, lookup_loader, callback_loader, strategy_loader, module_loader
from ansible.plugins.callback import CallbackBase from ansible.plugins.callback import CallbackBase
from ansible.template import Templar from ansible.template import Templar
from ansible.utils.helpers import pct_to_int from ansible.utils.helpers import pct_to_int
@ -48,6 +51,23 @@ except ImportError:
__all__ = ['TaskQueueManager'] __all__ = ['TaskQueueManager']
# TODO: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on
# the loaders created there
class SharedPluginLoaderObj:
'''
A simple object to make pass the various plugin loaders to
the forked processes over the queue easier
'''
def __init__(self):
self.action_loader = action_loader
self.connection_loader = connection_loader
self.filter_loader = filter_loader
self.test_loader = test_loader
self.lookup_loader = lookup_loader
self.module_loader = module_loader
class TaskQueueManager: class TaskQueueManager:
''' '''
@ -100,18 +120,59 @@ class TaskQueueManager:
self._failed_hosts = dict() self._failed_hosts = dict()
self._unreachable_hosts = dict() self._unreachable_hosts = dict()
self._final_q = multiprocessing.Queue()
# A temporary file (opened pre-fork) used by connection # A temporary file (opened pre-fork) used by connection
# plugins for inter-process locking. # plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile() self._connection_lockfile = tempfile.TemporaryFile()
self._job_queue = deque()
self._job_queue_lock = threading.Lock()
self._res_queue = deque()
self._res_queue_lock = threading.Lock()
self._res_ready = threading.Event()
def _put_in_queue(self, data, queue, lock):
lock.acquire()
queue.appendleft(data)
lock.release()
def _pop_off_queue(self, queue, lock):
try:
data = None
lock.acquire()
data = queue.pop()
except:
pass
finally:
lock.release()
return data
def put_job(self, data):
self._put_in_queue(data, self._job_queue, self._job_queue_lock)
def get_job(self):
return self._pop_off_queue(self._job_queue, self._job_queue_lock)
def put_result(self, data):
self._put_in_queue(data, self._res_queue, self._res_queue_lock)
self._res_ready.set()
def get_result(self):
return self._pop_off_queue(self._res_queue, self._res_queue_lock)
def _initialize_processes(self, num): def _initialize_processes(self, num):
# FIXME: do we need a global lock for workers here instead of a per-worker?
self._workers = [] self._workers = []
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
for i in range(num): for i in range(num):
rslt_q = multiprocessing.Queue() w_thread = threading.Thread(target=run_worker, args=(self, shared_loader_obj))
self._workers.append([None, rslt_q]) w_thread.start()
w_lock = threading.Lock()
self._workers.append([w_thread, w_lock])
def _initialize_notified_handlers(self, play): def _initialize_notified_handlers(self, play):
''' '''
@ -312,18 +373,13 @@ class TaskQueueManager:
def cleanup(self): def cleanup(self):
display.debug("RUNNING CLEANUP") display.debug("RUNNING CLEANUP")
self.terminate() self.terminate()
self._final_q.close()
self._cleanup_processes() self._cleanup_processes()
def _cleanup_processes(self): def _cleanup_processes(self):
if hasattr(self, '_workers'): if hasattr(self, '_workers'):
for (worker_prc, rslt_q) in self._workers: for (w_thread, w_lock) in self._workers:
rslt_q.close() if w_thread and not w_thread.is_alive():
if worker_prc and worker_prc.is_alive(): w_thread.join()
try:
worker_prc.terminate()
except AttributeError:
pass
def clear_failed_hosts(self): def clear_failed_hosts(self):
self._failed_hosts = dict() self._failed_hosts = dict()

View file

@ -15,6 +15,7 @@ import sys
import warnings import warnings
from collections import defaultdict from collections import defaultdict
from threading import Lock
from ansible import constants as C from ansible import constants as C
from ansible.plugins import get_plugin_class, MODULE_CACHE, PATH_CACHE, PLUGIN_PATH_CACHE from ansible.plugins import get_plugin_class, MODULE_CACHE, PATH_CACHE, PLUGIN_PATH_CACHE
@ -73,6 +74,7 @@ class PluginLoader:
self._extra_dirs = [] self._extra_dirs = []
self._searched_paths = set() self._searched_paths = set()
self._lock = Lock()
def __setstate__(self, data): def __setstate__(self, data):
''' '''
@ -360,11 +362,14 @@ class PluginLoader:
if path is None: if path is None:
return None return None
self._lock.acquire()
if path not in self._module_cache: if path not in self._module_cache:
self._module_cache[path] = self._load_module_source(name, path) self._module_cache[path] = self._load_module_source(name, path)
found_in_cache = False found_in_cache = False
obj = getattr(self._module_cache[path], self.class_name) obj = getattr(self._module_cache[path], self.class_name)
self._lock.release()
if self.base_class: if self.base_class:
# The import path is hardcoded and should be the right place, # The import path is hardcoded and should be the right place,
# so we are not expecting an ImportError. # so we are not expecting an ImportError.
@ -423,15 +428,18 @@ class PluginLoader:
yield path yield path
continue continue
if path not in self._module_cache:
self._module_cache[path] = self._load_module_source(name, path)
found_in_cache = False
try: try:
self._lock.acquire()
if path not in self._module_cache:
self._module_cache[path] = self._load_module_source(name, path)
found_in_cache = False
obj = getattr(self._module_cache[path], self.class_name) obj = getattr(self._module_cache[path], self.class_name)
except AttributeError as e: except AttributeError as e:
display.warning("Skipping plugin (%s) as it seems to be invalid: %s" % (path, to_text(e))) display.warning("Skipping plugin (%s) as it seems to be invalid: %s" % (path, to_text(e)))
continue continue
finally:
self._lock.release()
if self.base_class: if self.base_class:
# The import path is hardcoded and should be the right place, # The import path is hardcoded and should be the right place,

View file

@ -24,13 +24,11 @@ import threading
import time import time
from collections import deque from collections import deque
from multiprocessing import Lock
from jinja2.exceptions import UndefinedError from jinja2.exceptions import UndefinedError
from ansible import constants as C from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor import action_write_locks from ansible.executor import action_write_locks
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host from ansible.inventory.host import Host
from ansible.module_utils.six.moves import queue as Queue from ansible.module_utils.six.moves import queue as Queue
@ -55,45 +53,6 @@ except ImportError:
__all__ = ['StrategyBase'] __all__ = ['StrategyBase']
class StrategySentinel:
pass
# TODO: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on
# the loaders created there
class SharedPluginLoaderObj:
'''
A simple object to make pass the various plugin loaders to
the forked processes over the queue easier
'''
def __init__(self):
self.action_loader = action_loader
self.connection_loader = connection_loader
self.filter_loader = filter_loader
self.test_loader = test_loader
self.lookup_loader = lookup_loader
self.module_loader = module_loader
_sentinel = StrategySentinel()
def results_thread_main(strategy):
while True:
try:
result = strategy._final_q.get()
if isinstance(result, StrategySentinel):
break
else:
strategy._results_lock.acquire()
strategy._results.append(result)
strategy._results_lock.release()
except (IOError, EOFError):
break
except Queue.Empty:
pass
class StrategyBase: class StrategyBase:
''' '''
@ -102,16 +61,15 @@ class StrategyBase:
''' '''
def __init__(self, tqm): def __init__(self, tqm):
self._tqm = tqm self._tqm = tqm
self._inventory = tqm.get_inventory() self._inventory = tqm.get_inventory()
self._workers = tqm.get_workers() self._workers = tqm._workers
self._notified_handlers = tqm._notified_handlers self._notified_handlers = tqm._notified_handlers
self._listening_handlers = tqm._listening_handlers self._listening_handlers = tqm._listening_handlers
self._variable_manager = tqm.get_variable_manager() self._variable_manager = tqm.get_variable_manager()
self._loader = tqm.get_loader() self._loader = tqm.get_loader()
self._final_q = tqm._final_q self._step = getattr(tqm._options, 'step', False)
self._step = getattr(tqm._options, 'step', False) self._diff = getattr(tqm._options, 'diff', False)
self._diff = getattr(tqm._options, 'diff', False)
# Backwards compat: self._display isn't really needed, just import the global display and use that. # Backwards compat: self._display isn't really needed, just import the global display and use that.
self._display = display self._display = display
@ -124,17 +82,8 @@ class StrategyBase:
# outstanding tasks still in queue # outstanding tasks still in queue
self._blocked_hosts = dict() self._blocked_hosts = dict()
self._results = deque()
self._results_lock = threading.Condition(threading.Lock())
# create the result processing thread for reading results in the background
self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
self._results_thread.daemon = True
self._results_thread.start()
def cleanup(self): def cleanup(self):
self._final_q.put(_sentinel) self._tqm.terminate()
self._results_thread.join()
def run(self, iterator, play_context, result=0): def run(self, iterator, play_context, result=0):
# execute one more pass through the iterator without peeking, to # execute one more pass through the iterator without peeking, to
@ -203,38 +152,11 @@ class StrategyBase:
if task.action not in action_write_locks.action_write_locks: if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action) display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = Lock() action_write_locks.action_write_locks[task.action] = threading.Lock()
# and then queue the new task self._tqm.put_job((host, task, play_context, task_vars))
try: self._pending_results += 1
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
queued = False
starting_worker = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive():
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
if queued:
break
elif self._cur_worker == starting_worker:
time.sleep(0.0001)
self._pending_results += 1
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
display.debug("got an error while queuing: %s" % e)
return
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
def get_task_hosts(self, iterator, task_host, task): def get_task_hosts(self, iterator, task_host, task):
@ -325,23 +247,21 @@ class StrategyBase:
cur_pass = 0 cur_pass = 0
while True: while True:
try: task_result = self._tqm.get_result()
self._results_lock.acquire() if task_result is None:
task_result = self._results.popleft()
except IndexError:
break break
finally:
self._results_lock.release()
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc. # get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
original_host = get_original_host(task_result._host) #original_host = get_original_host(task_result._host)
found_task = iterator.get_original_task(original_host, task_result._task) #found_task = iterator.get_original_task(original_host, task_result._task)
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) #original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
original_task._parent = found_task._parent #original_task._parent = found_task._parent
original_task.from_attrs(task_result._task_fields) #original_task.from_attrs(task_result._task_fields)
task_result._host = original_host #task_result._host = original_host
task_result._task = original_task #task_result._task = original_task
original_host = task_result._host
original_task = task_result._task
# get the correct loop var for use later # get the correct loop var for use later
if original_task.loop_control: if original_task.loop_control:
@ -586,7 +506,9 @@ class StrategyBase:
results = self._process_pending_results(iterator) results = self._process_pending_results(iterator)
ret_results.extend(results) ret_results.extend(results)
if self._pending_results > 0: if self._pending_results > 0:
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) #time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
self._tqm._res_ready.wait()
self._tqm._res_ready.clear()
display.debug("no more pending results, returning what we have") display.debug("no more pending results, returning what we have")