Fixing up performance
This commit is contained in:
parent
c64ac90560
commit
63c47fb271
10 changed files with 62 additions and 26 deletions
|
@ -21,6 +21,7 @@ __metaclass__ = type
|
||||||
|
|
||||||
from ansible.compat.six.moves import queue
|
from ansible.compat.six.moves import queue
|
||||||
|
|
||||||
|
import json
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
@ -43,6 +44,7 @@ from ansible.executor.task_executor import TaskExecutor
|
||||||
from ansible.executor.task_result import TaskResult
|
from ansible.executor.task_result import TaskResult
|
||||||
from ansible.playbook.handler import Handler
|
from ansible.playbook.handler import Handler
|
||||||
from ansible.playbook.task import Task
|
from ansible.playbook.task import Task
|
||||||
|
from ansible.vars.unsafe_proxy import AnsibleJSONUnsafeDecoder
|
||||||
|
|
||||||
from ansible.utils.debug import debug
|
from ansible.utils.debug import debug
|
||||||
|
|
||||||
|
@ -59,9 +61,9 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
def __init__(self, tqm, main_q, rslt_q, loader):
|
def __init__(self, tqm, main_q, rslt_q, loader):
|
||||||
|
|
||||||
# takes a task queue manager as the sole param:
|
# takes a task queue manager as the sole param:
|
||||||
self._main_q = main_q
|
self._main_q = main_q
|
||||||
self._rslt_q = rslt_q
|
self._rslt_q = rslt_q
|
||||||
self._loader = loader
|
self._loader = loader
|
||||||
|
|
||||||
# dupe stdin, if we have one
|
# dupe stdin, if we have one
|
||||||
self._new_stdin = sys.stdin
|
self._new_stdin = sys.stdin
|
||||||
|
@ -97,9 +99,9 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
while True:
|
while True:
|
||||||
task = None
|
task = None
|
||||||
try:
|
try:
|
||||||
|
debug("waiting for a message...")
|
||||||
(host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get()
|
(host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get()
|
||||||
debug("there's work to be done!")
|
debug("there's work to be done! got a task/handler to work on: %s" % task)
|
||||||
debug("got a task/handler to work on: %s" % task)
|
|
||||||
|
|
||||||
# because the task queue manager starts workers (forks) before the
|
# because the task queue manager starts workers (forks) before the
|
||||||
# playbook is loaded, set the basedir of the loader inherted by
|
# playbook is loaded, set the basedir of the loader inherted by
|
||||||
|
@ -114,7 +116,15 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
|
|
||||||
# execute the task and build a TaskResult from the result
|
# execute the task and build a TaskResult from the result
|
||||||
debug("running TaskExecutor() for %s/%s" % (host, task))
|
debug("running TaskExecutor() for %s/%s" % (host, task))
|
||||||
executor_result = TaskExecutor(host, task, job_vars, play_context, self._new_stdin, self._loader, shared_loader_obj).run()
|
executor_result = TaskExecutor(
|
||||||
|
host,
|
||||||
|
task,
|
||||||
|
job_vars,
|
||||||
|
play_context,
|
||||||
|
self._new_stdin,
|
||||||
|
self._loader,
|
||||||
|
shared_loader_obj,
|
||||||
|
).run()
|
||||||
debug("done running TaskExecutor() for %s/%s" % (host, task))
|
debug("done running TaskExecutor() for %s/%s" % (host, task))
|
||||||
task_result = TaskResult(host, task, executor_result)
|
task_result = TaskResult(host, task, executor_result)
|
||||||
|
|
||||||
|
|
|
@ -156,7 +156,8 @@ class TaskExecutor:
|
||||||
# create a copy of the job vars here so that we can modify
|
# create a copy of the job vars here so that we can modify
|
||||||
# them temporarily without changing them too early for other
|
# them temporarily without changing them too early for other
|
||||||
# parts of the code that might still need a pristine version
|
# parts of the code that might still need a pristine version
|
||||||
vars_copy = self._job_vars.copy()
|
#vars_copy = self._job_vars.copy()
|
||||||
|
vars_copy = self._job_vars
|
||||||
|
|
||||||
# now we update them with the play context vars
|
# now we update them with the play context vars
|
||||||
self._play_context.update_vars(vars_copy)
|
self._play_context.update_vars(vars_copy)
|
||||||
|
@ -196,7 +197,8 @@ class TaskExecutor:
|
||||||
|
|
||||||
# make copies of the job vars and task so we can add the item to
|
# make copies of the job vars and task so we can add the item to
|
||||||
# the variables and re-validate the task with the item variable
|
# the variables and re-validate the task with the item variable
|
||||||
task_vars = self._job_vars.copy()
|
#task_vars = self._job_vars.copy()
|
||||||
|
task_vars = self._job_vars
|
||||||
|
|
||||||
items = self._squash_items(items, task_vars)
|
items = self._squash_items(items, task_vars)
|
||||||
for item in items:
|
for item in items:
|
||||||
|
@ -357,7 +359,8 @@ class TaskExecutor:
|
||||||
|
|
||||||
# make a copy of the job vars here, in case we need to update them
|
# make a copy of the job vars here, in case we need to update them
|
||||||
# with the registered variable value later on when testing conditions
|
# with the registered variable value later on when testing conditions
|
||||||
vars_copy = variables.copy()
|
#vars_copy = variables.copy()
|
||||||
|
vars_copy = variables
|
||||||
|
|
||||||
self._display.debug("starting attempt loop")
|
self._display.debug("starting attempt loop")
|
||||||
result = None
|
result = None
|
||||||
|
|
|
@ -96,7 +96,7 @@ class TaskQueueManager:
|
||||||
self._workers = []
|
self._workers = []
|
||||||
|
|
||||||
def _initialize_workers(self, num):
|
def _initialize_workers(self, num):
|
||||||
for i in range(num):
|
for i in xrange(num_workers):
|
||||||
main_q = multiprocessing.Queue()
|
main_q = multiprocessing.Queue()
|
||||||
rslt_q = multiprocessing.Queue()
|
rslt_q = multiprocessing.Queue()
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,13 @@ class Inventory(object):
|
||||||
|
|
||||||
self.parse_inventory(host_list)
|
self.parse_inventory(host_list)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
data = dict()
|
||||||
|
return data
|
||||||
|
|
||||||
|
def deserialize(self, data):
|
||||||
|
pass
|
||||||
|
|
||||||
def parse_inventory(self, host_list):
|
def parse_inventory(self, host_list):
|
||||||
|
|
||||||
if isinstance(host_list, string_types):
|
if isinstance(host_list, string_types):
|
||||||
|
@ -686,8 +693,6 @@ class Inventory(object):
|
||||||
basedirs = [self._playbook_basedir]
|
basedirs = [self._playbook_basedir]
|
||||||
|
|
||||||
for basedir in basedirs:
|
for basedir in basedirs:
|
||||||
display.debug('getting vars from %s' % basedir)
|
|
||||||
|
|
||||||
# this can happen from particular API usages, particularly if not run
|
# this can happen from particular API usages, particularly if not run
|
||||||
# from /usr/bin/ansible-playbook
|
# from /usr/bin/ansible-playbook
|
||||||
if basedir in ('', None):
|
if basedir in ('', None):
|
||||||
|
|
|
@ -22,6 +22,9 @@ __metaclass__ = type
|
||||||
from ansible.compat.six.moves import queue as Queue
|
from ansible.compat.six.moves import queue as Queue
|
||||||
from ansible.compat.six import iteritems, text_type, string_types
|
from ansible.compat.six import iteritems, text_type, string_types
|
||||||
|
|
||||||
|
import json
|
||||||
|
import pickle
|
||||||
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from jinja2.exceptions import UndefinedError
|
from jinja2.exceptions import UndefinedError
|
||||||
|
@ -37,7 +40,7 @@ from ansible.playbook.included_file import IncludedFile
|
||||||
from ansible.playbook.role import hash_params
|
from ansible.playbook.role import hash_params
|
||||||
from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
|
from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
|
||||||
from ansible.template import Templar
|
from ansible.template import Templar
|
||||||
from ansible.vars.unsafe_proxy import wrap_var
|
from ansible.vars.unsafe_proxy import wrap_var, AnsibleJSONUnsafeEncoder
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from __main__ import display
|
from __main__ import display
|
||||||
|
@ -127,11 +130,8 @@ class StrategyBase:
|
||||||
Base class method to add extra variables/information to the list of task
|
Base class method to add extra variables/information to the list of task
|
||||||
vars sent through the executor engine regarding the task queue manager state.
|
vars sent through the executor engine regarding the task queue manager state.
|
||||||
'''
|
'''
|
||||||
|
vars['ansible_current_hosts'] = [h.name for h in self.get_hosts_remaining(play)]
|
||||||
new_vars = vars.copy()
|
vars['ansible_failed_hosts'] = [h.name for h in self.get_failed_hosts(play)]
|
||||||
new_vars['ansible_current_hosts'] = self.get_hosts_remaining(play)
|
|
||||||
new_vars['ansible_failed_hosts'] = self.get_failed_hosts(play)
|
|
||||||
return new_vars
|
|
||||||
|
|
||||||
def _queue_task(self, host, task, task_vars, play_context):
|
def _queue_task(self, host, task, task_vars, play_context):
|
||||||
''' handles queueing the task up to be sent to a worker '''
|
''' handles queueing the task up to be sent to a worker '''
|
||||||
|
@ -263,7 +263,7 @@ class StrategyBase:
|
||||||
|
|
||||||
if task.delegate_to is not None:
|
if task.delegate_to is not None:
|
||||||
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
|
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
|
||||||
task_vars = self.add_tqm_variables(task_vars, play=iterator._play)
|
add_tqm_variables(task_vars, play=iterator._play)
|
||||||
if item is not None:
|
if item is not None:
|
||||||
task_vars['item'] = item
|
task_vars['item'] = item
|
||||||
templar = Templar(loader=self._loader, variables=task_vars)
|
templar = Templar(loader=self._loader, variables=task_vars)
|
||||||
|
@ -516,7 +516,7 @@ class StrategyBase:
|
||||||
for host in notified_hosts:
|
for host in notified_hosts:
|
||||||
if not handler.has_triggered(host) and (host.name not in self._tqm._failed_hosts or play_context.force_handlers):
|
if not handler.has_triggered(host) and (host.name not in self._tqm._failed_hosts or play_context.force_handlers):
|
||||||
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=handler)
|
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=handler)
|
||||||
task_vars = self.add_tqm_variables(task_vars, play=iterator._play)
|
self.add_tqm_variables(task_vars, play=iterator._play)
|
||||||
self._queue_task(host, handler, task_vars, play_context)
|
self._queue_task(host, handler, task_vars, play_context)
|
||||||
if run_once:
|
if run_once:
|
||||||
break
|
break
|
||||||
|
|
|
@ -211,7 +211,7 @@ class StrategyModule(StrategyBase):
|
||||||
|
|
||||||
self._display.debug("getting variables")
|
self._display.debug("getting variables")
|
||||||
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
|
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
|
||||||
task_vars = self.add_tqm_variables(task_vars, play=iterator._play)
|
self.add_tqm_variables(task_vars, play=iterator._play)
|
||||||
templar = Templar(loader=self._loader, variables=task_vars)
|
templar = Templar(loader=self._loader, variables=task_vars)
|
||||||
self._display.debug("done getting variables")
|
self._display.debug("done getting variables")
|
||||||
|
|
||||||
|
|
|
@ -258,7 +258,7 @@ class Templar:
|
||||||
'''
|
'''
|
||||||
|
|
||||||
assert isinstance(variables, dict)
|
assert isinstance(variables, dict)
|
||||||
self._available_variables = variables.copy()
|
self._available_variables = variables
|
||||||
|
|
||||||
def template(self, variable, convert_bare=False, preserve_trailing_newlines=True, escape_backslashes=True, fail_on_undefined=None, overrides=None, convert_data=True):
|
def template(self, variable, convert_bare=False, preserve_trailing_newlines=True, escape_backslashes=True, fail_on_undefined=None, overrides=None, convert_data=True):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -83,7 +83,10 @@ class AnsibleJ2Vars:
|
||||||
if isinstance(variable, dict) and varname == "vars" or isinstance(variable, HostVars):
|
if isinstance(variable, dict) and varname == "vars" or isinstance(variable, HostVars):
|
||||||
return variable
|
return variable
|
||||||
else:
|
else:
|
||||||
return self._templar.template(variable)
|
if self._templar._contains_vars(variable):
|
||||||
|
return self._templar.template(variable)
|
||||||
|
else:
|
||||||
|
return variable
|
||||||
|
|
||||||
def add_locals(self, locals):
|
def add_locals(self, locals):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -198,7 +198,7 @@ class VariableManager:
|
||||||
debug("vars are cached, returning them now")
|
debug("vars are cached, returning them now")
|
||||||
return VARIABLE_CACHE[cache_entry]
|
return VARIABLE_CACHE[cache_entry]
|
||||||
|
|
||||||
all_vars = defaultdict(dict)
|
all_vars = dict()
|
||||||
magic_variables = self._get_magic_variables(
|
magic_variables = self._get_magic_variables(
|
||||||
loader=loader,
|
loader=loader,
|
||||||
play=play,
|
play=play,
|
||||||
|
|
|
@ -53,11 +53,12 @@
|
||||||
from __future__ import (absolute_import, division, print_function)
|
from __future__ import (absolute_import, division, print_function)
|
||||||
__metaclass__ = type
|
__metaclass__ = type
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
from ansible.utils.unicode import to_unicode
|
from ansible.utils.unicode import to_unicode
|
||||||
from ansible.compat.six import string_types, text_type
|
from ansible.compat.six import string_types, text_type
|
||||||
|
|
||||||
__all__ = ['UnsafeProxy', 'AnsibleUnsafe', 'wrap_var']
|
__all__ = ['UnsafeProxy', 'AnsibleUnsafe', 'AnsibleJSONUnsafeEncoder', 'AnsibleJSONUnsafeDecoder', 'wrap_var']
|
||||||
|
|
||||||
|
|
||||||
class AnsibleUnsafe(object):
|
class AnsibleUnsafe(object):
|
||||||
__UNSAFE__ = True
|
__UNSAFE__ = True
|
||||||
|
@ -76,6 +77,20 @@ class UnsafeProxy(object):
|
||||||
return AnsibleUnsafeText(obj)
|
return AnsibleUnsafeText(obj)
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
|
class AnsibleJSONUnsafeEncoder(json.JSONEncoder):
|
||||||
|
def encode(self, obj):
|
||||||
|
if isinstance(obj, AnsibleUnsafe):
|
||||||
|
return super(AnsibleJSONUnsafeEncoder, self).encode(dict(__ansible_unsafe=True, value=unicode(obj)))
|
||||||
|
else:
|
||||||
|
return super(AnsibleJSONUnsafeEncoder, self).encode(obj)
|
||||||
|
|
||||||
|
class AnsibleJSONUnsafeDecoder(json.JSONDecoder):
|
||||||
|
def decode(self, obj):
|
||||||
|
value = super(AnsibleJSONUnsafeDecoder, self).decode(obj)
|
||||||
|
if isinstance(value, dict) and '__ansible_unsafe' in value:
|
||||||
|
return UnsafeProxy(value.get('value', ''))
|
||||||
|
else:
|
||||||
|
return value
|
||||||
|
|
||||||
def _wrap_dict(v):
|
def _wrap_dict(v):
|
||||||
for k in v.keys():
|
for k in v.keys():
|
||||||
|
|
Loading…
Reference in a new issue