correctly handle term signals

- adhoc now terminates gracefully
- avoid race condition on terminations by ignoring errors if
  worker might have been reaped between checking if active and termination call
- ansible-playbook now properly exits on sigint/term
- adhoc and playbook now give exceptions that we should not normally capture
  and rely on top level finally to reap children
- handle systemexit breaks in workers
- added debug to see at which frame we exit
partial fix for #14346
This commit is contained in:
Brian Coca 2016-02-06 00:53:01 -05:00
parent 45755bc0e5
commit 5a1887cc76
5 changed files with 21 additions and 8 deletions

View file

@ -21,6 +21,7 @@ __metaclass__ = type
######################################################## ########################################################
import os import os
import signal
from ansible import constants as C from ansible import constants as C
from ansible.cli import CLI from ansible.cli import CLI
@ -88,6 +89,10 @@ class AdHocCLI(CLI):
tasks = [ dict(action=dict(module=self.options.module_name, args=parse_kv(self.options.module_args)), async=async, poll=poll) ] tasks = [ dict(action=dict(module=self.options.module_name, args=parse_kv(self.options.module_args)), async=async, poll=poll) ]
) )
def _terminate(self, signum=None, framenum=None):
if signum is not None:
raise SystemExit("Interrupt detected, shutting down gracefully")
def run(self): def run(self):
''' use Runner lib to do SSH things ''' ''' use Runner lib to do SSH things '''
@ -170,6 +175,9 @@ class AdHocCLI(CLI):
# now create a task queue manager to execute the play # now create a task queue manager to execute the play
self._tqm = None self._tqm = None
try: try:
# Manage user interruptions
signal.signal(signal.SIGTERM, self._terminate)
self._tqm = TaskQueueManager( self._tqm = TaskQueueManager(
inventory=inventory, inventory=inventory,
variable_manager=variable_manager, variable_manager=variable_manager,
@ -180,6 +188,7 @@ class AdHocCLI(CLI):
run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS, run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
run_tree=run_tree, run_tree=run_tree,
) )
result = self._tqm.run(play) result = self._tqm.run(play)
finally: finally:
if self._tqm: if self._tqm:

View file

@ -69,7 +69,7 @@ class PlaybookExecutor:
may limit the runs to serialized groups, etc. may limit the runs to serialized groups, etc.
''' '''
signal.signal(signal.SIGINT, self._cleanup) signal.signal(signal.SIGTERM, self._terminate)
result = 0 result = 0
entrylist = [] entrylist = []
@ -199,7 +199,7 @@ class PlaybookExecutor:
finally: finally:
if self._tqm is not None: if self._tqm is not None:
self._cleanup() self._tqm.cleanup()
if self._options.syntax: if self._options.syntax:
display.display("No issues encountered") display.display("No issues encountered")
@ -207,8 +207,9 @@ class PlaybookExecutor:
return result return result
def _cleanup(self, signum=None, framenum=None): def _terminate(self, signum=None, framenum=None):
return self._tqm.cleanup() display.debug(framenum)
raise SystemExit("Terminating run due to external signal")
def _get_serialized_batches(self, play): def _get_serialized_batches(self, play):
''' '''

View file

@ -163,7 +163,7 @@ class ResultProcess(multiprocessing.Process):
except queue.Empty: except queue.Empty:
pass pass
except (KeyboardInterrupt, IOError, EOFError): except (KeyboardInterrupt, SystemExit, IOError, EOFError):
break break
except: except:
# TODO: we should probably send a proper callback here instead of # TODO: we should probably send a proper callback here instead of

View file

@ -132,7 +132,7 @@ class WorkerProcess(multiprocessing.Process):
self._rslt_q.put(task_result, block=False) self._rslt_q.put(task_result, block=False)
except Exception as e: except Exception as e:
if not isinstance(e, (IOError, EOFError, KeyboardInterrupt)) or isinstance(e, TemplateNotFound): if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
try: try:
self._host.vars = dict() self._host.vars = dict()
self._host.groups = [] self._host.groups = []
@ -140,7 +140,7 @@ class WorkerProcess(multiprocessing.Process):
self._rslt_q.put(task_result, block=False) self._rslt_q.put(task_result, block=False)
except: except:
debug(u"WORKER EXCEPTION: %s" % to_unicode(e)) debug(u"WORKER EXCEPTION: %s" % to_unicode(e))
debug(u"WORKER EXCEPTION: %s" % to_unicode(traceback.format_exc())) debug(u"WORKER TRACEBACK: %s" % to_unicode(traceback.format_exc()))
debug("WORKER PROCESS EXITING") debug("WORKER PROCESS EXITING")

View file

@ -253,7 +253,10 @@ class TaskQueueManager:
rslt_q.close() rslt_q.close()
main_q.close() main_q.close()
if worker_prc and worker_prc.is_alive(): if worker_prc and worker_prc.is_alive():
worker_prc.terminate() try:
worker_prc.terminate()
except AttributeError:
pass
def clear_failed_hosts(self): def clear_failed_hosts(self):
self._failed_hosts = dict() self._failed_hosts = dict()