From 5cbeab5a3cbd55a7252c94c74c57ca69aade7846 Mon Sep 17 00:00:00 2001
From: James Cammarata <jimi@sngx.net>
Date: Mon, 16 Nov 2015 16:12:57 -0500
Subject: [PATCH] Performance improvements for HostVars and some bugfixes

---
 lib/ansible/executor/process/result.py     |  6 +-
 lib/ansible/executor/process/worker.py     | 15 +++--
 lib/ansible/executor/task_queue_manager.py | 38 +++++++++--
 lib/ansible/plugins/strategy/__init__.py   | 14 ++--
 lib/ansible/vars/__init__.py               | 18 ++---
 lib/ansible/vars/hostvars.py               | 78 ++++++++--------------
 6 files changed, 85 insertions(+), 84 deletions(-)

diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py
index 2d13aa44cda..cdc8875631d 100644
--- a/lib/ansible/executor/process/result.py
+++ b/lib/ansible/executor/process/result.py
@@ -58,7 +58,7 @@ class ResultProcess(multiprocessing.Process):
 
     def _send_result(self, result):
         debug(u"sending result: %s" % ([text_type(x) for x in result],))
-        self._final_q.put(result, block=False)
+        self._final_q.put(result)
         debug("done sending result")
 
     def _read_worker_result(self):
@@ -73,7 +73,7 @@ class ResultProcess(multiprocessing.Process):
             try:
                 if not rslt_q.empty():
                     debug("worker %d has data to read" % self._cur_worker)
-                    result = rslt_q.get(block=False)
+                    result = rslt_q.get()
                     debug("got a result from worker %d: %s" % (self._cur_worker, result))
                     break
             except queue.Empty:
@@ -101,7 +101,7 @@ class ResultProcess(multiprocessing.Process):
             try:
                 result = self._read_worker_result()
                 if result is None:
-                    time.sleep(0.01)
+                    time.sleep(0.0001)
                     continue
 
                 clean_copy = strip_internal_keys(result._result)
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index 1cc1f7df438..a1a83a5ddaa 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -59,11 +59,13 @@ class WorkerProcess(multiprocessing.Process):
     for reading later.
     '''
 
-    def __init__(self, tqm, main_q, rslt_q, loader):
+    def __init__(self, tqm, main_q, rslt_q, hostvars_manager, loader):
 
+        super(WorkerProcess, self).__init__()
         # takes a task queue manager as the sole param:
         self._main_q   = main_q
         self._rslt_q   = rslt_q
+        self._hostvars = hostvars_manager
         self._loader   = loader
 
         # dupe stdin, if we have one
@@ -82,8 +84,6 @@ class WorkerProcess(multiprocessing.Process):
             # couldn't get stdin's fileno, so we just carry on
             pass
 
-        super(WorkerProcess, self).__init__()
-
     def run(self):
         '''
         Called when the process is started, and loops indefinitely
@@ -100,14 +100,15 @@ class WorkerProcess(multiprocessing.Process):
         while True:
             task = None
             try:
-                debug("waiting for a message...")
-                (host, task, basedir, zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get()
+                #debug("waiting for work")
+                (host, task, basedir, zip_vars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get(block=False)
 
                 if compressed_vars:
                     job_vars = json.loads(zlib.decompress(zip_vars))
                 else:
                     job_vars = zip_vars
-                job_vars['hostvars'] = hostvars
+
+                job_vars['hostvars'] = self._hostvars.hostvars()
 
                 debug("there's work to be done! got a task/handler to work on: %s" % task)
 
@@ -142,7 +143,7 @@ class WorkerProcess(multiprocessing.Process):
                 debug("done sending task result")
 
             except queue.Empty:
-                pass
+                time.sleep(0.0001)
             except AnsibleConnectionFailure:
                 try:
                     if task:
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 001d71e9e0c..3e62cb3c99d 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -19,6 +19,7 @@
 from __future__ import (absolute_import, division, print_function)
 __metaclass__ = type
 
+from multiprocessing.managers import SyncManager, DictProxy
 import multiprocessing
 import os
 import tempfile
@@ -32,6 +33,7 @@ from ansible.executor.stats import AggregateStats
 from ansible.playbook.play_context import PlayContext
 from ansible.plugins import callback_loader, strategy_loader, module_loader
 from ansible.template import Templar
+from ansible.vars.hostvars import HostVars
 
 try:
     from __main__ import display
@@ -98,7 +100,7 @@ class TaskQueueManager:
             main_q = multiprocessing.Queue()
             rslt_q = multiprocessing.Queue()
 
-            prc = WorkerProcess(self, main_q, rslt_q, self._loader)
+            prc = WorkerProcess(self, main_q, rslt_q, self._hostvars_manager, self._loader)
             prc.start()
 
             self._workers.append((prc, main_q, rslt_q))
@@ -173,11 +175,6 @@ class TaskQueueManager:
         are done with the current task).
         '''
 
-        # Fork # of forks, # of hosts or serial, whichever is lowest
-        contenders =  [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))]
-        contenders =  [ v for v in contenders if v is not None and v > 0 ]
-        self._initialize_processes(min(contenders))
-
         if not self._callbacks_loaded:
             self.load_callbacks()
 
@@ -187,6 +184,34 @@ class TaskQueueManager:
         new_play = play.copy()
         new_play.post_validate(templar)
 
+        class HostVarsManager(SyncManager):
+            pass
+
+        hostvars = HostVars(
+            play=new_play,
+            inventory=self._inventory,
+            variable_manager=self._variable_manager,
+            loader=self._loader,
+        )
+
+        HostVarsManager.register(
+            'hostvars',
+            callable=lambda: hostvars,
+            # FIXME: this is the list of exposed methods to the DictProxy object, plus our
+            #        one special one (set_variable_manager). There's probably a better way
+            #        to do this with a proper BaseProxy/DictProxy derivative
+            exposed=('set_variable_manager', '__contains__', '__delitem__', '__getitem__',
+                     '__len__', '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
+                     'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'),
+        )
+        self._hostvars_manager = HostVarsManager()
+        self._hostvars_manager.start()
+
+        # Fork # of forks, # of hosts or serial, whichever is lowest
+        contenders =  [self._options.forks, play.serial, len(self._inventory.get_hosts(new_play.hosts))]
+        contenders =  [ v for v in contenders if v is not None and v > 0 ]
+        self._initialize_processes(min(contenders))
+
         play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno())
         for callback_plugin in self._callback_plugins:
             if hasattr(callback_plugin, 'set_play_context'):
@@ -221,6 +246,7 @@ class TaskQueueManager:
         # and run the play using the strategy and cleanup on way out
         play_return = strategy.run(iterator, play_context)
         self._cleanup_processes()
+        self._hostvars_manager.shutdown()
         return play_return
 
     def cleanup(self):
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 3cdec5b573c..f1f46505292 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -158,7 +158,6 @@ class StrategyBase:
             # hostvars out of the task variables right now, due to the fact
             # that they're not JSON serializable
             compressed_vars = False
-            hostvars = task_vars.pop('hostvars', None)
             if C.DEFAULT_VAR_COMPRESSION_LEVEL > 0:
                 zip_vars = zlib.compress(json.dumps(task_vars), C.DEFAULT_VAR_COMPRESSION_LEVEL)
                 compressed_vars = True
@@ -170,10 +169,7 @@ class StrategyBase:
                 zip_vars = task_vars  # noqa (pyflakes false positive because task_vars is deleted in the conditional above)
 
             # and queue the task
-            main_q.put((host, task, self._loader.get_basedir(), zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj), block=False)
-
-            # nuke the hostvars object too, as its no longer needed
-            del hostvars
+            main_q.put((host, task, self._loader.get_basedir(), zip_vars, compressed_vars, play_context, shared_loader_obj))
 
             self._pending_results += 1
         except (EOFError, IOError, AssertionError) as e:
@@ -192,7 +188,7 @@ class StrategyBase:
 
         while not self._final_q.empty() and not self._tqm._terminated:
             try:
-                result = self._final_q.get(block=False)
+                result = self._final_q.get()
                 display.debug("got result from result worker: %s" % ([text_type(x) for x in result],))
 
                 # all host status messages contain 2 entries: (msg, task_result)
@@ -277,6 +273,7 @@ class StrategyBase:
                     var_value = wrap_var(result[3])
 
                     self._variable_manager.set_nonpersistent_facts(host, {var_name: var_value})
+                    self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager)
 
                 elif result[0] in ('set_host_var', 'set_host_facts'):
                     host = result[1]
@@ -307,11 +304,12 @@ class StrategyBase:
                             self._variable_manager.set_nonpersistent_facts(target_host, facts)
                         else:
                             self._variable_manager.set_host_facts(target_host, facts)
+                    self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager)
 
                 else:
                     raise AnsibleError("unknown result message received: %s" % result[0])
             except Queue.Empty:
-                time.sleep(0.01)
+                time.sleep(0.0001)
 
         return ret_results
 
@@ -327,7 +325,7 @@ class StrategyBase:
         while self._pending_results > 0 and not self._tqm._terminated:
             results = self._process_pending_results(iterator)
             ret_results.extend(results)
-            time.sleep(0.01)
+            time.sleep(0.0001)
         display.debug("no more pending results, returning what we have")
 
         return ret_results
diff --git a/lib/ansible/vars/__init__.py b/lib/ansible/vars/__init__.py
index 26f52adfb00..c895b59f5fb 100644
--- a/lib/ansible/vars/__init__.py
+++ b/lib/ansible/vars/__init__.py
@@ -359,15 +359,15 @@ class VariableManager:
                 for (group_name, group) in iteritems(self._inventory.groups):
                     variables['groups'][group_name] = [h.name for h in group.get_hosts()]
 
-                if include_hostvars:
-                    hostvars_cache_entry = self._get_cache_entry(play=play)
-                    if hostvars_cache_entry in HOSTVARS_CACHE:
-                        hostvars = HOSTVARS_CACHE[hostvars_cache_entry]
-                    else:
-                        hostvars = HostVars(play=play, inventory=self._inventory, loader=loader, variable_manager=self)
-                        HOSTVARS_CACHE[hostvars_cache_entry] = hostvars
-                    variables['hostvars'] = hostvars
-                    variables['vars'] = hostvars[host.get_name()]
+                #if include_hostvars:
+                #    hostvars_cache_entry = self._get_cache_entry(play=play)
+                #    if hostvars_cache_entry in HOSTVARS_CACHE:
+                #        hostvars = HOSTVARS_CACHE[hostvars_cache_entry]
+                #    else:
+                #        hostvars = HostVars(play=play, inventory=self._inventory, loader=loader, variable_manager=self)
+                #        HOSTVARS_CACHE[hostvars_cache_entry] = hostvars
+                #    variables['hostvars'] = hostvars
+                #    variables['vars'] = hostvars[host.get_name()]
 
         if play:
             variables['role_names'] = [r._role_name for r in play.roles]
diff --git a/lib/ansible/vars/hostvars.py b/lib/ansible/vars/hostvars.py
index de279840395..246b2c78123 100644
--- a/lib/ansible/vars/hostvars.py
+++ b/lib/ansible/vars/hostvars.py
@@ -48,74 +48,50 @@ class HostVars(collections.Mapping):
 
     def __init__(self, play, inventory, variable_manager, loader):
         self._lookup = dict()
+        self._inventory = inventory
         self._loader = loader
         self._play = play
         self._variable_manager = variable_manager
         self._cached_result = dict()
 
-        hosts = inventory.get_hosts(ignore_limits_and_restrictions=True)
+    def set_variable_manager(self, variable_manager):
+        self._variable_manager = variable_manager
 
-        # check to see if localhost is in the hosts list, as we
-        # may have it referenced via hostvars but if created implicitly
-        # it doesn't sow up in the hosts list
-        has_localhost = False
-        for host in hosts:
-            if host.name in C.LOCALHOST:
-                has_localhost = True
-                break
-
-        if not has_localhost:
-            new_host =  Host(name='localhost')
-            new_host.set_variable("ansible_python_interpreter", sys.executable)
-            new_host.set_variable("ansible_connection", "local")
-            new_host.address = '127.0.0.1'
-            hosts.append(new_host)
-
-        for host in hosts:
-            self._lookup[host.name] = host
+    def _find_host(self, host_name):
+        return self._inventory.get_host(host_name)
 
     def __getitem__(self, host_name):
+        host = self._find_host(host_name)
+        if host is None:
+            return j2undefined
 
-        if host_name not in self._lookup:
-            return j2undefined()
-
-        host = self._lookup.get(host_name)
         data = self._variable_manager.get_vars(loader=self._loader, host=host, play=self._play, include_hostvars=False)
 
+        #****************************************************
+        # TESTING REMOVAL OF THIS
+        #****************************************************
+        # Since we template much later now in 2.0, it may be completely unrequired to do
+        # a full template of the vars returned above, which is quite costly in time when
+        # the result is large.
         # Using cache in order to avoid template call
-        sha1_hash = sha1(str(data).encode('utf-8')).hexdigest()
-        if sha1_hash in self._cached_result:
-            result = self._cached_result[sha1_hash]
-        else:
-            templar = Templar(variables=data, loader=self._loader)
-            result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS)
-            self._cached_result[sha1_hash] = result
-        return result
+        #sha1_hash = sha1(str(data).encode('utf-8')).hexdigest()
+        #if sha1_hash in self._cached_result:
+        #    result = self._cached_result[sha1_hash]
+        #else:
+        #    templar = Templar(variables=data, loader=self._loader)
+        #    result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS)
+        #    self._cached_result[sha1_hash] = result
+        #return result
+        #****************************************************
+        return data
 
     def __contains__(self, host_name):
-        item = self.get(host_name)
-        if item and item is not j2undefined:
-            return True
-        return False
+        return self._find_host(host_name) is not None
 
     def __iter__(self):
-        for host in self._lookup:
+        for host in self._inventory.get_hosts(ignore_limits_and_restrictions=True):
             yield host
 
     def __len__(self):
-        return len(self._lookup)
+        return len(self._inventory.get_hosts(ignore_limits_and_restrictions=True))
 
-    def __getstate__(self):
-        return dict(
-            loader=self._loader,
-            lookup=self._lookup,
-            play=self._play,
-            var_manager=self._variable_manager,
-        )
-
-    def __setstate__(self, data):
-        self._play = data.get('play')
-        self._loader = data.get('loader')
-        self._lookup = data.get('lookup')
-        self._variable_manager = data.get('var_manager')
-        self._cached_result = dict()