Fix duplicate callback issue in v2

All v2+ callbacks can now optionally define a CALLBACK_TYPE, which
when set to 'stdout' will limit those callbacks which are used for
primary output to a single callback plugin (specified to the
TaskQueueManager object and configurable in ansible.cfg/environment)
This commit is contained in:
James Cammarata 2015-04-29 01:06:33 -05:00
parent dc12669c40
commit 4bb37b82c4
7 changed files with 48 additions and 13 deletions

View file

@ -162,6 +162,7 @@ DEFAULT_CONNECTION_PLUGIN_PATH = get_config(p, DEFAULTS, 'connection_plugins', '
DEFAULT_LOOKUP_PLUGIN_PATH = get_config(p, DEFAULTS, 'lookup_plugins', 'ANSIBLE_LOOKUP_PLUGINS', '~/.ansible/plugins/lookup_plugins:/usr/share/ansible_plugins/lookup_plugins') DEFAULT_LOOKUP_PLUGIN_PATH = get_config(p, DEFAULTS, 'lookup_plugins', 'ANSIBLE_LOOKUP_PLUGINS', '~/.ansible/plugins/lookup_plugins:/usr/share/ansible_plugins/lookup_plugins')
DEFAULT_VARS_PLUGIN_PATH = get_config(p, DEFAULTS, 'vars_plugins', 'ANSIBLE_VARS_PLUGINS', '~/.ansible/plugins/vars_plugins:/usr/share/ansible_plugins/vars_plugins') DEFAULT_VARS_PLUGIN_PATH = get_config(p, DEFAULTS, 'vars_plugins', 'ANSIBLE_VARS_PLUGINS', '~/.ansible/plugins/vars_plugins:/usr/share/ansible_plugins/vars_plugins')
DEFAULT_FILTER_PLUGIN_PATH = get_config(p, DEFAULTS, 'filter_plugins', 'ANSIBLE_FILTER_PLUGINS', '~/.ansible/plugins/filter_plugins:/usr/share/ansible_plugins/filter_plugins') DEFAULT_FILTER_PLUGIN_PATH = get_config(p, DEFAULTS, 'filter_plugins', 'ANSIBLE_FILTER_PLUGINS', '~/.ansible/plugins/filter_plugins:/usr/share/ansible_plugins/filter_plugins')
DEFAULT_STDOUT_CALLBACK = get_config(p, DEFAULTS, 'stdout_callback', 'ANSIBLE_STDOUT_CALLBACK', 'default')
CACHE_PLUGIN = get_config(p, DEFAULTS, 'fact_caching', 'ANSIBLE_CACHE_PLUGIN', 'memory') CACHE_PLUGIN = get_config(p, DEFAULTS, 'fact_caching', 'ANSIBLE_CACHE_PLUGIN', 'memory')
CACHE_PLUGIN_CONNECTION = get_config(p, DEFAULTS, 'fact_caching_connection', 'ANSIBLE_CACHE_PLUGIN_CONNECTION', None) CACHE_PLUGIN_CONNECTION = get_config(p, DEFAULTS, 'fact_caching_connection', 'ANSIBLE_CACHE_PLUGIN_CONNECTION', None)

View file

@ -48,7 +48,7 @@ class PlaybookExecutor:
if options.listhosts or options.listtasks or options.listtags: if options.listhosts or options.listtasks or options.listtags:
self._tqm = None self._tqm = None
else: else:
self._tqm = TaskQueueManager(inventory=inventory, callback='default', variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=self.passwords) self._tqm = TaskQueueManager(inventory=inventory, variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=self.passwords)
def run(self): def run(self):

View file

@ -24,6 +24,7 @@ import os
import socket import socket
import sys import sys
from ansible import constants as C
from ansible.errors import AnsibleError from ansible.errors import AnsibleError
from ansible.executor.connection_info import ConnectionInformation from ansible.executor.connection_info import ConnectionInformation
from ansible.executor.play_iterator import PlayIterator from ansible.executor.play_iterator import PlayIterator
@ -48,7 +49,7 @@ class TaskQueueManager:
which dispatches the Play's tasks to hosts. which dispatches the Play's tasks to hosts.
''' '''
def __init__(self, inventory, callback, variable_manager, loader, display, options, passwords): def __init__(self, inventory, variable_manager, loader, display, options, passwords, stdout_callback=None):
self._inventory = inventory self._inventory = inventory
self._variable_manager = variable_manager self._variable_manager = variable_manager
@ -70,14 +71,8 @@ class TaskQueueManager:
self._final_q = multiprocessing.Queue() self._final_q = multiprocessing.Queue()
# load all available callback plugins # load callback plugins
# FIXME: we need an option to white-list callback plugins self._callback_plugins = self._load_callbacks(stdout_callback)
self._callback_plugins = []
for callback_plugin in callback_loader.all(class_only=True):
if hasattr(callback_plugin, 'CALLBACK_VERSION') and callback_plugin.CALLBACK_VERSION >= 2.0:
self._callback_plugins.append(callback_plugin(self._display))
else:
self._callback_plugins.append(callback_plugin())
# create the pool of worker threads, based on the number of forks specified # create the pool of worker threads, based on the number of forks specified
try: try:
@ -120,6 +115,40 @@ class TaskQueueManager:
for handler in handler_list: for handler in handler_list:
self._notified_handlers[handler.get_name()] = [] self._notified_handlers[handler.get_name()] = []
def _load_callbacks(self, stdout_callback):
'''
Loads all available callbacks, with the exception of those which
utilize the CALLBACK_TYPE option. When CALLBACK_TYPE is set to 'stdout',
only one such callback plugin will be loaded.
'''
loaded_plugins = []
stdout_callback_loaded = False
if stdout_callback is None:
stdout_callback = C.DEFAULT_STDOUT_CALLBACK
if stdout_callback not in callback_loader:
raise AnsibleError("Invalid callback for stdout specified: %s" % stdout_callback)
for callback_plugin in callback_loader.all(class_only=True):
if hasattr(callback_plugin, 'CALLBACK_VERSION') and callback_plugin.CALLBACK_VERSION >= 2.0:
# we only allow one callback of type 'stdout' to be loaded, so check
# the name of the current plugin and type to see if we need to skip
# loading this callback plugin
callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', None)
(callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path))
if callback_type == 'stdout':
if callback_name != stdout_callback or stdout_callback_loaded:
continue
stdout_callback_loaded = True
loaded_plugins.append(callback_plugin(self._display))
else:
loaded_plugins.append(callback_plugin())
return loaded_plugins
def run(self, play): def run(self, play):
''' '''
Iterates over the roles/tasks in a play, using the given (or default) Iterates over the roles/tasks in a play, using the given (or default)

View file

@ -243,9 +243,12 @@ class PluginLoader:
if path not in self._module_cache: if path not in self._module_cache:
self._module_cache[path] = imp.load_source('.'.join([self.package, name]), path) self._module_cache[path] = imp.load_source('.'.join([self.package, name]), path)
if kwargs.get('class_only', False): if kwargs.get('class_only', False):
yield getattr(self._module_cache[path], self.class_name) obj = getattr(self._module_cache[path], self.class_name)
else: else:
yield getattr(self._module_cache[path], self.class_name)(*args, **kwargs) obj = getattr(self._module_cache[path], self.class_name)(*args, **kwargs)
# set extra info on the module, in case we want it later
setattr(obj, '_original_path', path)
yield obj
action_loader = PluginLoader( action_loader = PluginLoader(
'ActionModule', 'ActionModule',

View file

@ -31,6 +31,7 @@ class CallbackModule(CallbackBase):
''' '''
CALLBACK_VERSION = 2.0 CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
def v2_on_any(self, *args, **kwargs): def v2_on_any(self, *args, **kwargs):
pass pass

View file

@ -32,6 +32,7 @@ class CallbackModule(CallbackBase):
''' '''
CALLBACK_VERSION = 2.0 CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
def v2_on_any(self, *args, **kwargs): def v2_on_any(self, *args, **kwargs):
pass pass

View file

@ -150,7 +150,7 @@ class Cli(object):
# now create a task queue manager to execute the play # now create a task queue manager to execute the play
try: try:
display = Display() display = Display()
tqm = TaskQueueManager(inventory=inventory, callback='minimal', variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=passwords) tqm = TaskQueueManager(inventory=inventory, variable_manager=variable_manager, loader=loader, display=display, options=options, passwords=passwords, stdout_callback='minimal')
result = tqm.run(play) result = tqm.run(play)
tqm.cleanup() tqm.cleanup()
except AnsibleError: except AnsibleError: