Fix the mapping of module_name to Locks
This was reinitialized every time we forked before so we weren't sharing the same Locks. It also was not accounting for modules which were directly invoked by an action plugin instead of going through the strategy plguins.
This commit is contained in:
parent
d5585220a4
commit
5909a4473d
2 changed files with 55 additions and 15 deletions
|
@ -34,7 +34,10 @@ from ansible.release import __version__, __author__
|
||||||
from ansible import constants as C
|
from ansible import constants as C
|
||||||
from ansible.errors import AnsibleError
|
from ansible.errors import AnsibleError
|
||||||
from ansible.utils.unicode import to_bytes, to_unicode
|
from ansible.utils.unicode import to_bytes, to_unicode
|
||||||
from ansible.plugins.strategy import action_write_locks
|
# Must import strategy and use write_locks from there
|
||||||
|
# If we import write_locks directly then we end up binding a
|
||||||
|
# variable to the object and then it never gets updated.
|
||||||
|
from ansible.plugins import strategy
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from __main__ import display
|
from __main__ import display
|
||||||
|
@ -552,14 +555,29 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta
|
||||||
zipdata = None
|
zipdata = None
|
||||||
# Optimization -- don't lock if the module has already been cached
|
# Optimization -- don't lock if the module has already been cached
|
||||||
if os.path.exists(cached_module_filename):
|
if os.path.exists(cached_module_filename):
|
||||||
|
display.debug('ZIPLOADER: using cached module: %s' % cached_module_filename)
|
||||||
zipdata = open(cached_module_filename, 'rb').read()
|
zipdata = open(cached_module_filename, 'rb').read()
|
||||||
# Fool the check later... I think we should just remove the check
|
# Fool the check later... I think we should just remove the check
|
||||||
py_module_names.add(('basic',))
|
py_module_names.add(('basic',))
|
||||||
else:
|
else:
|
||||||
with action_write_locks[module_name]:
|
if module_name in strategy.action_write_locks:
|
||||||
|
display.debug('ZIPLOADER: Using lock for %s' % module_name)
|
||||||
|
lock = strategy.action_write_locks[module_name]
|
||||||
|
else:
|
||||||
|
# If the action plugin directly invokes the module (instead of
|
||||||
|
# going through a strategy) then we don't have a cross-process
|
||||||
|
# Lock specifically for this module. Use the "unexpected
|
||||||
|
# module" lock instead
|
||||||
|
display.debug('ZIPLOADER: Using generic lock for %s' % module_name)
|
||||||
|
lock = strategy.action_write_locks[None]
|
||||||
|
|
||||||
|
display.debug('ZIPLOADER: Acquiring lock')
|
||||||
|
with lock:
|
||||||
|
display.debug('ZIPLOADER: Lock acquired: %s' % id(lock))
|
||||||
# Check that no other process has created this while we were
|
# Check that no other process has created this while we were
|
||||||
# waiting for the lock
|
# waiting for the lock
|
||||||
if not os.path.exists(cached_module_filename):
|
if not os.path.exists(cached_module_filename):
|
||||||
|
display.debug('ZIPLOADER: Creating module')
|
||||||
# Create the module zip data
|
# Create the module zip data
|
||||||
zipoutput = BytesIO()
|
zipoutput = BytesIO()
|
||||||
zf = zipfile.ZipFile(zipoutput, mode='w', compression=compression_method)
|
zf = zipfile.ZipFile(zipoutput, mode='w', compression=compression_method)
|
||||||
|
@ -580,15 +598,19 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta
|
||||||
# Note -- if we have a global function to setup, that would
|
# Note -- if we have a global function to setup, that would
|
||||||
# be a better place to run this
|
# be a better place to run this
|
||||||
os.mkdir(lookup_path)
|
os.mkdir(lookup_path)
|
||||||
|
display.debug('ZIPLOADER: Writing module')
|
||||||
with open(cached_module_filename + '-part', 'w') as f:
|
with open(cached_module_filename + '-part', 'w') as f:
|
||||||
f.write(zipdata)
|
f.write(zipdata)
|
||||||
|
|
||||||
# Rename the file into its final position in the cache so
|
# Rename the file into its final position in the cache so
|
||||||
# future users of this module can read it off the
|
# future users of this module can read it off the
|
||||||
# filesystem instead of constructing from scratch.
|
# filesystem instead of constructing from scratch.
|
||||||
|
display.debug('ZIPLOADER: Renaming module')
|
||||||
os.rename(cached_module_filename + '-part', cached_module_filename)
|
os.rename(cached_module_filename + '-part', cached_module_filename)
|
||||||
|
display.debug('ZIPLOADER: Done creating module')
|
||||||
|
|
||||||
if zipdata is None:
|
if zipdata is None:
|
||||||
|
display.debug('ZIPLOADER: Reading module after lock')
|
||||||
# Another process wrote the file while we were waiting for
|
# Another process wrote the file while we were waiting for
|
||||||
# the write lock. Go ahead and read the data from disk
|
# the write lock. Go ahead and read the data from disk
|
||||||
# instead of re-creating it.
|
# instead of re-creating it.
|
||||||
|
|
|
@ -36,6 +36,7 @@ from ansible.executor.process.worker import WorkerProcess
|
||||||
from ansible.executor.task_result import TaskResult
|
from ansible.executor.task_result import TaskResult
|
||||||
from ansible.inventory.host import Host
|
from ansible.inventory.host import Host
|
||||||
from ansible.inventory.group import Group
|
from ansible.inventory.group import Group
|
||||||
|
from ansible.module_utils.facts import Facts
|
||||||
from ansible.playbook.helpers import load_list_of_blocks
|
from ansible.playbook.helpers import load_list_of_blocks
|
||||||
from ansible.playbook.included_file import IncludedFile
|
from ansible.playbook.included_file import IncludedFile
|
||||||
from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
|
from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
|
||||||
|
@ -52,8 +53,24 @@ except ImportError:
|
||||||
|
|
||||||
__all__ = ['StrategyBase']
|
__all__ = ['StrategyBase']
|
||||||
|
|
||||||
action_write_locks = defaultdict(Lock)
|
if 'action_write_locks' not in globals():
|
||||||
|
# Do not initialize this more than once because it seems to bash
|
||||||
|
# the existing one. multiprocessing must be reloading the module
|
||||||
|
# when it forks?
|
||||||
|
action_write_locks = dict()
|
||||||
|
|
||||||
|
# Below is a Lock for use when we weren't expecting a named module.
|
||||||
|
# It gets used when an action plugin directly invokes a module instead
|
||||||
|
# of going through the strategies. Slightly less efficient as all
|
||||||
|
# processes with unexpected module names will wait on this lock
|
||||||
|
action_write_locks[None] = Lock()
|
||||||
|
|
||||||
|
# These plugins are called directly by action plugins (not going through
|
||||||
|
# a strategy). We precreate them here as an optimization
|
||||||
|
mods = set(p['name'] for p in Facts.PKG_MGRS)
|
||||||
|
mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
|
||||||
|
for mod_name in mods:
|
||||||
|
action_write_locks[mod_name] = Lock()
|
||||||
|
|
||||||
# TODO: this should probably be in the plugins/__init__.py, with
|
# TODO: this should probably be in the plugins/__init__.py, with
|
||||||
# a smarter mechanism to set all of the attributes based on
|
# a smarter mechanism to set all of the attributes based on
|
||||||
|
@ -144,18 +161,19 @@ class StrategyBase:
|
||||||
|
|
||||||
display.debug("entering _queue_task() for %s/%s" % (host, task))
|
display.debug("entering _queue_task() for %s/%s" % (host, task))
|
||||||
|
|
||||||
# Add a write lock for tasks.
|
# Add a write lock for tasks.
|
||||||
# Maybe this should be added somewhere further up the call stack but
|
# Maybe this should be added somewhere further up the call stack but
|
||||||
# this is the earliest in the code where we have task (1) extracted
|
# this is the earliest in the code where we have task (1) extracted
|
||||||
# into its own variable and (2) there's only a single code path
|
# into its own variable and (2) there's only a single code path
|
||||||
# leading to the module being run. This is called by three
|
# leading to the module being run. This is called by three
|
||||||
# functions: __init__.py::_do_handler_run(), linear.py::run(), and
|
# functions: __init__.py::_do_handler_run(), linear.py::run(), and
|
||||||
# free.py::run() so we'd have to add to all three to do it there.
|
# free.py::run() so we'd have to add to all three to do it there.
|
||||||
# The next common higher level is __init__.py::run() and that has
|
# The next common higher level is __init__.py::run() and that has
|
||||||
# tasks inside of play_iterator so we'd have to extract them to do it
|
# tasks inside of play_iterator so we'd have to extract them to do it
|
||||||
# there.
|
# there.
|
||||||
if not action_write_locks[task.action]:
|
global action_write_locks
|
||||||
display.warning('Python defaultdict did not create the Lock for us. Creating manually')
|
if task.action not in action_write_locks:
|
||||||
|
display.debug('Creating lock for %s' % task.action)
|
||||||
action_write_locks[task.action] = Lock()
|
action_write_locks[task.action] = Lock()
|
||||||
|
|
||||||
# and then queue the new task
|
# and then queue the new task
|
||||||
|
|
Loading…
Reference in a new issue