ansible/test/runner/lib/util.py
Matt Clay a07d42e16d Add support for cloud tests to ansible-test. (#24315)
* Split out ansible-test docker functions.
* Add cloud support to ansible-test.
2017-05-05 16:23:00 +08:00

550 lines
14 KiB
Python

"""Miscellaneous utility functions and classes."""
from __future__ import absolute_import, print_function
import errno
import os
import pipes
import pkgutil
import shutil
import subprocess
import re
import sys
import time
def is_shippable():
"""
:rtype: bool
"""
return os.environ.get('SHIPPABLE') == 'true'
def remove_file(path):
"""
:type path: str
"""
if os.path.isfile(path):
os.remove(path)
def find_executable(executable, cwd=None, path=None, required=True):
"""
:type executable: str
:type cwd: str
:type path: str
:type required: bool | str
:rtype: str | None
"""
match = None
real_cwd = os.getcwd()
if not cwd:
cwd = real_cwd
if os.path.dirname(executable):
target = os.path.join(cwd, executable)
if os.path.exists(target) and os.access(target, os.F_OK | os.X_OK):
match = executable
else:
if path is None:
path = os.environ.get('PATH', os.defpath)
if path:
path_dirs = path.split(os.pathsep)
seen_dirs = set()
for path_dir in path_dirs:
if path_dir in seen_dirs:
continue
seen_dirs.add(path_dir)
if os.path.abspath(path_dir) == real_cwd:
path_dir = cwd
candidate = os.path.join(path_dir, executable)
if os.path.exists(candidate) and os.access(candidate, os.F_OK | os.X_OK):
match = candidate
break
if not match and required:
message = 'Required program "%s" not found.' % executable
if required != 'warning':
raise ApplicationError(message)
display.warning(message)
return match
def run_command(args, cmd, capture=False, env=None, data=None, cwd=None, always=False, stdin=None, stdout=None,
cmd_verbosity=1):
"""
:type args: CommonConfig
:type cmd: collections.Iterable[str]
:type capture: bool
:type env: dict[str, str] | None
:type data: str | None
:type cwd: str | None
:type always: bool
:type stdin: file | None
:type stdout: file | None
:type cmd_verbosity: int
:rtype: str | None, str | None
"""
explain = args.explain and not always
return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout,
cmd_verbosity=cmd_verbosity)
def raw_command(cmd, capture=False, env=None, data=None, cwd=None, explain=False, stdin=None, stdout=None,
cmd_verbosity=1):
"""
:type cmd: collections.Iterable[str]
:type capture: bool
:type env: dict[str, str] | None
:type data: str | None
:type cwd: str | None
:type explain: bool
:type stdin: file | None
:type stdout: file | None
:type cmd_verbosity: int
:rtype: str | None, str | None
"""
if not cwd:
cwd = os.getcwd()
if not env:
env = common_environment()
cmd = list(cmd)
escaped_cmd = ' '.join(pipes.quote(c) for c in cmd)
display.info('Run command: %s' % escaped_cmd, verbosity=cmd_verbosity)
display.info('Working directory: %s' % cwd, verbosity=2)
program = find_executable(cmd[0], cwd=cwd, path=env['PATH'], required='warning')
if program:
display.info('Program found: %s' % program, verbosity=2)
for key in sorted(env.keys()):
display.info('%s=%s' % (key, env[key]), verbosity=2)
if explain:
return None, None
communicate = False
if stdin is not None:
data = None
communicate = True
elif data is not None:
stdin = subprocess.PIPE
communicate = True
if stdout:
communicate = True
if capture:
stdout = stdout or subprocess.PIPE
stderr = subprocess.PIPE
communicate = True
else:
stderr = None
start = time.time()
try:
process = subprocess.Popen(cmd, env=env, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd)
except OSError as ex:
if ex.errno == errno.ENOENT:
raise ApplicationError('Required program "%s" not found.' % cmd[0])
raise
if communicate:
encoding = 'utf-8'
data_bytes = data.encode(encoding) if data else None
stdout_bytes, stderr_bytes = process.communicate(data_bytes)
stdout_text = stdout_bytes.decode(encoding) if stdout_bytes else u''
stderr_text = stderr_bytes.decode(encoding) if stderr_bytes else u''
else:
process.wait()
stdout_text, stderr_text = None, None
status = process.returncode
runtime = time.time() - start
display.info('Command exited with status %s after %s seconds.' % (status, runtime), verbosity=4)
if status == 0:
return stdout_text, stderr_text
raise SubprocessError(cmd, status, stdout_text, stderr_text, runtime)
def common_environment():
"""Common environment used for executing all programs."""
env = dict(
LC_ALL='en_US.UTF-8',
PATH=os.environ.get('PATH', os.defpath),
)
required = (
'HOME',
)
optional = (
'HTTPTESTER',
'SSH_AUTH_SOCK'
)
env.update(pass_vars(required=required, optional=optional))
return env
def pass_vars(required=None, optional=None):
"""
:type required: collections.Iterable[str]
:type optional: collections.Iterable[str]
:rtype: dict[str, str]
"""
env = {}
for name in required:
if name not in os.environ:
raise MissingEnvironmentVariable(name)
env[name] = os.environ[name]
for name in optional:
if name not in os.environ:
continue
env[name] = os.environ[name]
return env
def deepest_path(path_a, path_b):
"""Return the deepest of two paths, or None if the paths are unrelated.
:type path_a: str
:type path_b: str
:rtype: str | None
"""
if path_a == '.':
path_a = ''
if path_b == '.':
path_b = ''
if path_a.startswith(path_b):
return path_a or '.'
if path_b.startswith(path_a):
return path_b or '.'
return None
def remove_tree(path):
"""
:type path: str
"""
try:
shutil.rmtree(path)
except OSError as ex:
if ex.errno != errno.ENOENT:
raise
def make_dirs(path):
"""
:type path: str
"""
try:
os.makedirs(path)
except OSError as ex:
if ex.errno != errno.EEXIST:
raise
def is_binary_file(path):
"""
:type path: str
:rtype: bool
"""
with open(path, 'rb') as path_fd:
return b'\0' in path_fd.read(1024)
class Display(object):
"""Manages color console output."""
clear = '\033[0m'
red = '\033[31m'
green = '\033[32m'
yellow = '\033[33m'
blue = '\033[34m'
purple = '\033[35m'
cyan = '\033[36m'
verbosity_colors = {
0: None,
1: green,
2: blue,
3: cyan,
}
def __init__(self):
self.verbosity = 0
self.color = True
self.warnings = []
self.warnings_unique = set()
self.info_stderr = False
def __warning(self, message):
"""
:type message: str
"""
self.print_message('WARNING: %s' % message, color=self.purple, fd=sys.stderr)
def review_warnings(self):
"""Review all warnings which previously occurred."""
if not self.warnings:
return
self.__warning('Reviewing previous %d warning(s):' % len(self.warnings))
for warning in self.warnings:
self.__warning(warning)
def warning(self, message, unique=False):
"""
:type message: str
:type unique: bool
"""
if unique:
if message in self.warnings_unique:
return
self.warnings_unique.add(message)
self.__warning(message)
self.warnings.append(message)
def notice(self, message):
"""
:type message: str
"""
self.print_message('NOTICE: %s' % message, color=self.purple, fd=sys.stderr)
def error(self, message):
"""
:type message: str
"""
self.print_message('ERROR: %s' % message, color=self.red, fd=sys.stderr)
def info(self, message, verbosity=0):
"""
:type message: str
:type verbosity: int
"""
if self.verbosity >= verbosity:
color = self.verbosity_colors.get(verbosity, self.yellow)
self.print_message(message, color=color, fd=sys.stderr if self.info_stderr else sys.stdout)
def print_message(self, message, color=None, fd=sys.stdout): # pylint: disable=locally-disabled, invalid-name
"""
:type message: str
:type color: str | None
:type fd: file
"""
if color and self.color:
# convert color resets in message to desired color
message = message.replace(self.clear, color)
message = '%s%s%s' % (color, message, self.clear)
print(message, file=fd)
fd.flush()
class ApplicationError(Exception):
"""General application error."""
pass
class ApplicationWarning(Exception):
"""General application warning which interrupts normal program flow."""
pass
class SubprocessError(ApplicationError):
"""Error resulting from failed subprocess execution."""
def __init__(self, cmd, status=0, stdout=None, stderr=None, runtime=None):
"""
:type cmd: list[str]
:type status: int
:type stdout: str | None
:type stderr: str | None
:type runtime: float | None
"""
message = 'Command "%s" returned exit status %s.\n' % (' '.join(pipes.quote(c) for c in cmd), status)
if stderr:
message += '>>> Standard Error\n'
message += '%s%s\n' % (stderr.strip(), Display.clear)
if stdout:
message += '>>> Standard Output\n'
message += '%s%s\n' % (stdout.strip(), Display.clear)
message = message.strip()
super(SubprocessError, self).__init__(message)
self.cmd = cmd
self.status = status
self.stdout = stdout
self.stderr = stderr
self.runtime = runtime
class MissingEnvironmentVariable(ApplicationError):
"""Error caused by missing environment variable."""
def __init__(self, name):
"""
:type name: str
"""
super(MissingEnvironmentVariable, self).__init__('Missing environment variable: %s' % name)
self.name = name
class CommonConfig(object):
"""Configuration common to all commands."""
def __init__(self, args):
"""
:type args: any
"""
self.color = args.color # type: bool
self.explain = args.explain # type: bool
self.verbosity = args.verbosity # type: int
self.debug = args.debug # type: bool
class EnvironmentConfig(CommonConfig):
"""Configuration common to all commands which execute in an environment."""
def __init__(self, args, command):
"""
:type args: any
"""
super(EnvironmentConfig, self).__init__(args)
self.command = command
self.local = args.local is True
if args.tox is True or args.tox is False or args.tox is None:
self.tox = args.tox is True
self.tox_args = 0
self.python = args.python if 'python' in args else None # type: str
else:
self.tox = True
self.tox_args = 1
self.python = args.tox # type: str
self.docker = docker_qualify_image(args.docker) # type: str
self.remote = args.remote # type: str
self.docker_privileged = args.docker_privileged if 'docker_privileged' in args else False # type: bool
self.docker_util = docker_qualify_image(args.docker_util if 'docker_util' in args else '') # type: str
self.docker_pull = args.docker_pull if 'docker_pull' in args else False # type: bool
self.tox_sitepackages = args.tox_sitepackages # type: bool
self.remote_stage = args.remote_stage # type: str
self.remote_aws_region = args.remote_aws_region # type: str
self.requirements = args.requirements # type: bool
if self.python == 'default':
self.python = '.'.join(str(i) for i in sys.version_info[:2])
self.python_version = self.python or '.'.join(str(i) for i in sys.version_info[:2])
self.delegate = self.tox or self.docker or self.remote
if self.delegate:
self.requirements = True
def docker_qualify_image(name):
"""
:type name: str
:rtype: str
"""
if not name or any((c in name) for c in ('/', ':')):
return name
return 'ansible/ansible:%s' % name
def parse_to_dict(pattern, value):
"""
:type pattern: str
:type value: str
:return: dict[str, str]
"""
match = re.search(pattern, value)
if match is None:
raise Exception('Pattern "%s" did not match value: %s' % (pattern, value))
return match.groupdict()
def get_subclasses(class_type):
"""
:type class_type: type
:rtype: set[str]
"""
subclasses = set()
queue = [class_type]
while queue:
parent = queue.pop()
for child in parent.__subclasses__():
if child not in subclasses:
subclasses.add(child)
queue.append(child)
return subclasses
def import_plugins(directory):
"""
:type directory: str
"""
path = os.path.join(os.path.dirname(__file__), directory)
prefix = 'lib.%s.' % directory
for (_, name, _) in pkgutil.iter_modules([path], prefix=prefix):
__import__(name)
def load_plugins(base_type, database):
"""
:type base_type: type
:type database: dict[str, type]
"""
plugins = dict((sc.__module__.split('.')[2], sc) for sc in get_subclasses(base_type)) # type: dict [str, type]
for plugin in plugins:
database[plugin] = plugins[plugin]
display = Display() # pylint: disable=locally-disabled, invalid-name