ansible/test/lib/ansible_test/_internal/thread.py
Matt Clay d651bda123
Relocate ansible-test code. (#60147)
* Initial move of `test/runner/` content.

`test/runner/lib/` -> `test/lib/ansible_test/_internal/`
`test/runner/`     -> `test/lib/ansible_test/_internal/data/`

* Initial move of `test/sanity/` content.

`test/sanity/` -> `test/lib/ansible_test/_internal/data/sanity/` (except `test/sanity/ignore.txt`)

* Initial move of `test/units/pytest/` content.

`test/units/pytest/` -> `test/lib/ansible_test/_internal/data/pytest/`

* Follow-up move of `test/runner/unit/` content.

`test/lib/ansible_test/_internal/data/unit/` -> `test/lib/ansible_test/tests/unit/`

* Initial move of `ansible.cfg` content.

`test/units/ansible.cfg` -> `test/lib/ansible_test/_internal/data/units/ansible.cfg`
`test/env/ansible.cfg` -> `test/lib/ansible_test/_internal/data/env/ansible.cfg`

* Follow-up move of `data` directory.

`test/lib/ansible_test/_internal/data/` -> `test/lib/ansible_test/_data/`

* Update import statements.

* Add missing __init__.py for unit tests.

* Fix path references and miscellaneous issues.
2019-08-06 14:43:29 -07:00

57 lines
1.8 KiB
Python

"""Python threading tools."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import threading
import sys
try:
# noinspection PyPep8Naming
import Queue as queue
except ImportError:
# noinspection PyUnresolvedReferences
import queue # pylint: disable=locally-disabled, import-error
class WrappedThread(threading.Thread):
"""Wrapper around Thread which captures results and exceptions."""
def __init__(self, action):
"""
:type action: () -> any
"""
# noinspection PyOldStyleClasses
super(WrappedThread, self).__init__()
self._result = queue.Queue()
self.action = action
self.result = None
def run(self):
"""
Run action and capture results or exception.
Do not override. Do not call directly. Executed by the start() method.
"""
# We truly want to catch anything that the worker thread might do including call sys.exit.
# Therefore we catch *everything* (including old-style class exceptions)
# noinspection PyBroadException, PyPep8
try:
self._result.put((self.action(), None))
# pylint: disable=locally-disabled, bare-except
except: # noqa
self._result.put((None, sys.exc_info()))
def wait_for_result(self):
"""
Wait for thread to exit and return the result or raise an exception.
:rtype: any
"""
result, exception = self._result.get()
if exception:
if sys.version_info[0] > 2:
raise exception[1].with_traceback(exception[2])
# noinspection PyRedundantParentheses
exec('raise exception[0], exception[1], exception[2]') # pylint: disable=locally-disabled, exec-used
self.result = result
return result