6842c0467f
Previously empty test targets were ignored by ansible-test. This would prevent them from participating in dependency analysis. These targets are actually empty roles, and should be processed as such.
674 lines
21 KiB
Python
674 lines
21 KiB
Python
"""Test target identification, iteration and inclusion/exclusion."""
|
|
|
|
from __future__ import absolute_import, print_function
|
|
|
|
import collections
|
|
import os
|
|
import re
|
|
import errno
|
|
import itertools
|
|
import abc
|
|
import sys
|
|
|
|
from lib.util import (
|
|
ApplicationError,
|
|
display,
|
|
read_lines_without_comments,
|
|
)
|
|
|
|
MODULE_EXTENSIONS = '.py', '.ps1'
|
|
|
|
|
|
def find_target_completion(target_func, prefix):
|
|
"""
|
|
:type target_func: () -> collections.Iterable[CompletionTarget]
|
|
:type prefix: unicode
|
|
:rtype: list[str]
|
|
"""
|
|
try:
|
|
targets = target_func()
|
|
if sys.version_info[0] == 2:
|
|
prefix = prefix.encode()
|
|
short = os.environ.get('COMP_TYPE') == '63' # double tab completion from bash
|
|
matches = walk_completion_targets(targets, prefix, short)
|
|
return matches
|
|
except Exception as ex: # pylint: disable=locally-disabled, broad-except
|
|
return [u'%s' % ex]
|
|
|
|
|
|
def walk_completion_targets(targets, prefix, short=False):
|
|
"""
|
|
:type targets: collections.Iterable[CompletionTarget]
|
|
:type prefix: str
|
|
:type short: bool
|
|
:rtype: tuple[str]
|
|
"""
|
|
aliases = set(alias for target in targets for alias in target.aliases)
|
|
|
|
if prefix.endswith('/') and prefix in aliases:
|
|
aliases.remove(prefix)
|
|
|
|
matches = [alias for alias in aliases if alias.startswith(prefix) and '/' not in alias[len(prefix):-1]]
|
|
|
|
if short:
|
|
offset = len(os.path.dirname(prefix))
|
|
if offset:
|
|
offset += 1
|
|
relative_matches = [match[offset:] for match in matches if len(match) > offset]
|
|
if len(relative_matches) > 1:
|
|
matches = relative_matches
|
|
|
|
return tuple(sorted(matches))
|
|
|
|
|
|
def walk_internal_targets(targets, includes=None, excludes=None, requires=None):
|
|
"""
|
|
:type targets: collections.Iterable[T <= CompletionTarget]
|
|
:type includes: list[str]
|
|
:type excludes: list[str]
|
|
:type requires: list[str]
|
|
:rtype: tuple[T <= CompletionTarget]
|
|
"""
|
|
targets = tuple(targets)
|
|
|
|
include_targets = sorted(filter_targets(targets, includes, errors=True, directories=False), key=lambda t: t.name)
|
|
|
|
if requires:
|
|
require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
|
|
include_targets = [target for target in include_targets if target in require_targets]
|
|
|
|
if excludes:
|
|
list(filter_targets(targets, excludes, errors=True, include=False, directories=False))
|
|
|
|
internal_targets = set(filter_targets(include_targets, excludes, errors=False, include=False, directories=False))
|
|
return tuple(sorted(internal_targets, key=lambda t: t.name))
|
|
|
|
|
|
def walk_external_targets(targets, includes=None, excludes=None, requires=None):
|
|
"""
|
|
:type targets: collections.Iterable[CompletionTarget]
|
|
:type includes: list[str]
|
|
:type excludes: list[str]
|
|
:type requires: list[str]
|
|
:rtype: tuple[CompletionTarget], tuple[CompletionTarget]
|
|
"""
|
|
targets = tuple(targets)
|
|
|
|
if requires:
|
|
include_targets = list(filter_targets(targets, includes, errors=True, directories=False))
|
|
require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
|
|
includes = [target.name for target in include_targets if target in require_targets]
|
|
|
|
if includes:
|
|
include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
|
|
else:
|
|
include_targets = []
|
|
else:
|
|
include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
|
|
|
|
if excludes:
|
|
exclude_targets = sorted(filter_targets(targets, excludes, errors=True), key=lambda t: t.name)
|
|
else:
|
|
exclude_targets = []
|
|
|
|
previous = None
|
|
include = []
|
|
for target in include_targets:
|
|
if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
|
|
and previous.name == target.name:
|
|
previous.modules = tuple(set(previous.modules) | set(target.modules))
|
|
else:
|
|
include.append(target)
|
|
previous = target
|
|
|
|
previous = None
|
|
exclude = []
|
|
for target in exclude_targets:
|
|
if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
|
|
and previous.name == target.name:
|
|
previous.modules = tuple(set(previous.modules) | set(target.modules))
|
|
else:
|
|
exclude.append(target)
|
|
previous = target
|
|
|
|
return tuple(include), tuple(exclude)
|
|
|
|
|
|
def filter_targets(targets, patterns, include=True, directories=True, errors=True):
|
|
"""
|
|
:type targets: collections.Iterable[CompletionTarget]
|
|
:type patterns: list[str]
|
|
:type include: bool
|
|
:type directories: bool
|
|
:type errors: bool
|
|
:rtype: collections.Iterable[CompletionTarget]
|
|
"""
|
|
unmatched = set(patterns or ())
|
|
compiled_patterns = dict((p, re.compile('^%s$' % p)) for p in patterns) if patterns else None
|
|
|
|
for target in targets:
|
|
matched_directories = set()
|
|
match = False
|
|
|
|
if patterns:
|
|
for alias in target.aliases:
|
|
for pattern in patterns:
|
|
if compiled_patterns[pattern].match(alias):
|
|
match = True
|
|
|
|
try:
|
|
unmatched.remove(pattern)
|
|
except KeyError:
|
|
pass
|
|
|
|
if alias.endswith('/'):
|
|
if target.base_path and len(target.base_path) > len(alias):
|
|
matched_directories.add(target.base_path)
|
|
else:
|
|
matched_directories.add(alias)
|
|
elif include:
|
|
match = True
|
|
if not target.base_path:
|
|
matched_directories.add('.')
|
|
for alias in target.aliases:
|
|
if alias.endswith('/'):
|
|
if target.base_path and len(target.base_path) > len(alias):
|
|
matched_directories.add(target.base_path)
|
|
else:
|
|
matched_directories.add(alias)
|
|
|
|
if match != include:
|
|
continue
|
|
|
|
if directories and matched_directories:
|
|
yield DirectoryTarget(sorted(matched_directories, key=len)[0], target.modules)
|
|
else:
|
|
yield target
|
|
|
|
if errors:
|
|
if unmatched:
|
|
raise TargetPatternsNotMatched(unmatched)
|
|
|
|
|
|
def walk_module_targets():
|
|
"""
|
|
:rtype: collections.Iterable[TestTarget]
|
|
"""
|
|
path = 'lib/ansible/modules'
|
|
|
|
for target in walk_test_targets(path, path + '/', extensions=MODULE_EXTENSIONS):
|
|
if not target.module:
|
|
continue
|
|
|
|
yield target
|
|
|
|
|
|
def walk_units_targets():
|
|
"""
|
|
:rtype: collections.Iterable[TestTarget]
|
|
"""
|
|
return walk_test_targets(path='test/units', module_path='test/units/modules/', extensions=('.py',), prefix='test_')
|
|
|
|
|
|
def walk_compile_targets():
|
|
"""
|
|
:rtype: collections.Iterable[TestTarget]
|
|
"""
|
|
return walk_test_targets(module_path='lib/ansible/modules/', extensions=('.py',), extra_dirs=('bin',))
|
|
|
|
|
|
def walk_sanity_targets():
|
|
"""
|
|
:rtype: collections.Iterable[TestTarget]
|
|
"""
|
|
return walk_test_targets(module_path='lib/ansible/modules/')
|
|
|
|
|
|
def walk_posix_integration_targets(include_hidden=False):
|
|
"""
|
|
:type include_hidden: bool
|
|
:rtype: collections.Iterable[IntegrationTarget]
|
|
"""
|
|
for target in walk_integration_targets():
|
|
if 'posix/' in target.aliases or (include_hidden and 'hidden/posix/' in target.aliases):
|
|
yield target
|
|
|
|
|
|
def walk_network_integration_targets(include_hidden=False):
|
|
"""
|
|
:type include_hidden: bool
|
|
:rtype: collections.Iterable[IntegrationTarget]
|
|
"""
|
|
for target in walk_integration_targets():
|
|
if 'network/' in target.aliases or (include_hidden and 'hidden/network/' in target.aliases):
|
|
yield target
|
|
|
|
|
|
def walk_windows_integration_targets(include_hidden=False):
|
|
"""
|
|
:type include_hidden: bool
|
|
:rtype: collections.Iterable[IntegrationTarget]
|
|
"""
|
|
for target in walk_integration_targets():
|
|
if 'windows/' in target.aliases or (include_hidden and 'hidden/windows/' in target.aliases):
|
|
yield target
|
|
|
|
|
|
def walk_integration_targets():
|
|
"""
|
|
:rtype: collections.Iterable[IntegrationTarget]
|
|
"""
|
|
path = 'test/integration/targets'
|
|
modules = frozenset(t.module for t in walk_module_targets())
|
|
paths = sorted(os.path.join(path, p) for p in os.listdir(path))
|
|
prefixes = load_integration_prefixes()
|
|
|
|
for path in paths:
|
|
if os.path.isdir(path):
|
|
yield IntegrationTarget(path, modules, prefixes)
|
|
|
|
|
|
def load_integration_prefixes():
|
|
"""
|
|
:rtype: dict[str, str]
|
|
"""
|
|
path = 'test/integration'
|
|
names = sorted(f for f in os.listdir(path) if os.path.splitext(f)[0] == 'target-prefixes')
|
|
prefixes = {}
|
|
|
|
for name in names:
|
|
prefix = os.path.splitext(name)[1][1:]
|
|
with open(os.path.join(path, name), 'r') as prefix_fd:
|
|
prefixes.update(dict((k, prefix) for k in prefix_fd.read().splitlines()))
|
|
|
|
return prefixes
|
|
|
|
|
|
def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None, extra_dirs=None):
|
|
"""
|
|
:type path: str | None
|
|
:type module_path: str | None
|
|
:type extensions: tuple[str] | None
|
|
:type prefix: str | None
|
|
:type extra_dirs: tuple[str] | None
|
|
:rtype: collections.Iterable[TestTarget]
|
|
"""
|
|
for root, _, file_names in os.walk(path or '.', topdown=False):
|
|
if root.endswith('/__pycache__'):
|
|
continue
|
|
|
|
if '/.tox/' in root:
|
|
continue
|
|
|
|
if path is None:
|
|
root = root[2:]
|
|
|
|
if root.startswith('.') and root != '.github':
|
|
continue
|
|
|
|
for file_name in file_names:
|
|
name, ext = os.path.splitext(os.path.basename(file_name))
|
|
|
|
if name.startswith('.'):
|
|
continue
|
|
|
|
if extensions and ext not in extensions:
|
|
continue
|
|
|
|
if prefix and not name.startswith(prefix):
|
|
continue
|
|
|
|
file_path = os.path.join(root, file_name)
|
|
|
|
if os.path.islink(file_path):
|
|
# special case to allow a symlink of ansible_release.py -> ../release.py
|
|
if file_path != 'lib/ansible/module_utils/ansible_release.py':
|
|
continue
|
|
|
|
yield TestTarget(file_path, module_path, prefix, path)
|
|
|
|
if extra_dirs:
|
|
for extra_dir in extra_dirs:
|
|
file_names = os.listdir(extra_dir)
|
|
|
|
for file_name in file_names:
|
|
file_path = os.path.join(extra_dir, file_name)
|
|
|
|
if os.path.isfile(file_path) and not os.path.islink(file_path):
|
|
yield TestTarget(file_path, module_path, prefix, path)
|
|
|
|
|
|
def analyze_integration_target_dependencies(integration_targets):
|
|
"""
|
|
:type integration_targets: list[IntegrationTarget]
|
|
:rtype: dict[str,set[str]]
|
|
"""
|
|
real_target_root = os.path.realpath('test/integration/targets') + '/'
|
|
|
|
role_targets = [t for t in integration_targets if t.type == 'role']
|
|
hidden_role_target_names = set(t.name for t in role_targets if 'hidden/' in t.aliases)
|
|
|
|
dependencies = collections.defaultdict(set)
|
|
|
|
# handle setup dependencies
|
|
for target in integration_targets:
|
|
for setup_target_name in target.setup_always + target.setup_once:
|
|
dependencies[setup_target_name].add(target.name)
|
|
|
|
# handle target dependencies
|
|
for target in integration_targets:
|
|
for need_target in target.needs_target:
|
|
dependencies[need_target].add(target.name)
|
|
|
|
# handle symlink dependencies between targets
|
|
# this use case is supported, but discouraged
|
|
for target in integration_targets:
|
|
for root, _dummy, file_names in os.walk(target.path):
|
|
for name in file_names:
|
|
path = os.path.join(root, name)
|
|
|
|
if not os.path.islink(path):
|
|
continue
|
|
|
|
real_link_path = os.path.realpath(path)
|
|
|
|
if not real_link_path.startswith(real_target_root):
|
|
continue
|
|
|
|
link_target = real_link_path[len(real_target_root):].split('/')[0]
|
|
|
|
if link_target == target.name:
|
|
continue
|
|
|
|
dependencies[link_target].add(target.name)
|
|
|
|
# intentionally primitive analysis of role meta to avoid a dependency on pyyaml
|
|
# script based targets are scanned as they may execute a playbook with role dependencies
|
|
for target in integration_targets:
|
|
meta_dir = os.path.join(target.path, 'meta')
|
|
|
|
if not os.path.isdir(meta_dir):
|
|
continue
|
|
|
|
meta_paths = sorted([os.path.join(meta_dir, name) for name in os.listdir(meta_dir)])
|
|
|
|
for meta_path in meta_paths:
|
|
if os.path.exists(meta_path):
|
|
with open(meta_path, 'r') as meta_fd:
|
|
meta_lines = meta_fd.read().splitlines()
|
|
|
|
for meta_line in meta_lines:
|
|
if re.search(r'^ *#.*$', meta_line):
|
|
continue
|
|
|
|
if not meta_line.strip():
|
|
continue
|
|
|
|
for hidden_target_name in hidden_role_target_names:
|
|
if hidden_target_name in meta_line:
|
|
dependencies[hidden_target_name].add(target.name)
|
|
|
|
while True:
|
|
changes = 0
|
|
|
|
for dummy, dependent_target_names in dependencies.items():
|
|
for dependent_target_name in list(dependent_target_names):
|
|
new_target_names = dependencies.get(dependent_target_name)
|
|
|
|
if new_target_names:
|
|
for new_target_name in new_target_names:
|
|
if new_target_name not in dependent_target_names:
|
|
dependent_target_names.add(new_target_name)
|
|
changes += 1
|
|
|
|
if not changes:
|
|
break
|
|
|
|
for target_name in sorted(dependencies):
|
|
consumers = dependencies[target_name]
|
|
|
|
if not consumers:
|
|
continue
|
|
|
|
display.info('%s:' % target_name, verbosity=4)
|
|
|
|
for consumer in sorted(consumers):
|
|
display.info(' %s' % consumer, verbosity=4)
|
|
|
|
return dependencies
|
|
|
|
|
|
class CompletionTarget(object):
|
|
"""Command-line argument completion target base class."""
|
|
__metaclass__ = abc.ABCMeta
|
|
|
|
def __init__(self):
|
|
self.name = None
|
|
self.path = None
|
|
self.base_path = None
|
|
self.modules = tuple()
|
|
self.aliases = tuple()
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, CompletionTarget):
|
|
return self.__repr__() == other.__repr__()
|
|
|
|
return False
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __lt__(self, other):
|
|
return self.name.__lt__(other.name)
|
|
|
|
def __gt__(self, other):
|
|
return self.name.__gt__(other.name)
|
|
|
|
def __hash__(self):
|
|
return hash(self.__repr__())
|
|
|
|
def __repr__(self):
|
|
if self.modules:
|
|
return '%s (%s)' % (self.name, ', '.join(self.modules))
|
|
|
|
return self.name
|
|
|
|
|
|
class DirectoryTarget(CompletionTarget):
|
|
"""Directory target."""
|
|
def __init__(self, path, modules):
|
|
"""
|
|
:type path: str
|
|
:type modules: tuple[str]
|
|
"""
|
|
super(DirectoryTarget, self).__init__()
|
|
|
|
self.name = path
|
|
self.path = path
|
|
self.modules = modules
|
|
|
|
|
|
class TestTarget(CompletionTarget):
|
|
"""Generic test target."""
|
|
def __init__(self, path, module_path, module_prefix, base_path):
|
|
"""
|
|
:type path: str
|
|
:type module_path: str | None
|
|
:type module_prefix: str | None
|
|
:type base_path: str
|
|
"""
|
|
super(TestTarget, self).__init__()
|
|
|
|
self.name = path
|
|
self.path = path
|
|
self.base_path = base_path + '/' if base_path else None
|
|
|
|
name, ext = os.path.splitext(os.path.basename(self.path))
|
|
|
|
if module_path and path.startswith(module_path) and name != '__init__' and ext in MODULE_EXTENSIONS:
|
|
self.module = name[len(module_prefix or ''):].lstrip('_')
|
|
self.modules = (self.module,)
|
|
else:
|
|
self.module = None
|
|
self.modules = tuple()
|
|
|
|
aliases = [self.path, self.module]
|
|
parts = self.path.split('/')
|
|
|
|
for i in range(1, len(parts)):
|
|
alias = '%s/' % '/'.join(parts[:i])
|
|
aliases.append(alias)
|
|
|
|
aliases = [a for a in aliases if a]
|
|
|
|
self.aliases = tuple(sorted(aliases))
|
|
|
|
|
|
class IntegrationTarget(CompletionTarget):
|
|
"""Integration test target."""
|
|
non_posix = frozenset((
|
|
'network',
|
|
'windows',
|
|
))
|
|
|
|
categories = frozenset(non_posix | frozenset((
|
|
'posix',
|
|
'module',
|
|
'needs',
|
|
'skip',
|
|
)))
|
|
|
|
def __init__(self, path, modules, prefixes):
|
|
"""
|
|
:type path: str
|
|
:type modules: frozenset[str]
|
|
:type prefixes: dict[str, str]
|
|
"""
|
|
super(IntegrationTarget, self).__init__()
|
|
|
|
self.name = os.path.basename(path)
|
|
self.path = path
|
|
|
|
# script_path and type
|
|
|
|
contents = sorted(os.listdir(path))
|
|
|
|
runme_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'runme')
|
|
test_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'test')
|
|
|
|
self.script_path = None
|
|
|
|
if runme_files:
|
|
self.type = 'script'
|
|
self.script_path = os.path.join(path, runme_files[0])
|
|
elif test_files:
|
|
self.type = 'special'
|
|
elif os.path.isdir(os.path.join(path, 'tasks')) or os.path.isdir(os.path.join(path, 'defaults')):
|
|
self.type = 'role'
|
|
else:
|
|
self.type = 'role' # ansible will consider these empty roles, so ansible-test should as well
|
|
|
|
# static_aliases
|
|
|
|
try:
|
|
aliases_path = os.path.join(path, 'aliases')
|
|
static_aliases = tuple(read_lines_without_comments(aliases_path, remove_blank_lines=True))
|
|
except IOError as ex:
|
|
if ex.errno != errno.ENOENT:
|
|
raise
|
|
static_aliases = tuple()
|
|
|
|
# modules
|
|
|
|
if self.name in modules:
|
|
module_name = self.name
|
|
elif self.name.startswith('win_') and self.name[4:] in modules:
|
|
module_name = self.name[4:]
|
|
else:
|
|
module_name = None
|
|
|
|
self.modules = tuple(sorted(a for a in static_aliases + tuple([module_name]) if a in modules))
|
|
|
|
# groups
|
|
|
|
groups = [self.type]
|
|
groups += [a for a in static_aliases if a not in modules]
|
|
groups += ['module/%s' % m for m in self.modules]
|
|
|
|
if not self.modules:
|
|
groups.append('non_module')
|
|
|
|
if 'destructive' not in groups:
|
|
groups.append('non_destructive')
|
|
|
|
if '_' in self.name:
|
|
prefix = self.name[:self.name.find('_')]
|
|
else:
|
|
prefix = None
|
|
|
|
if prefix in prefixes:
|
|
group = prefixes[prefix]
|
|
|
|
if group != prefix:
|
|
group = '%s/%s' % (group, prefix)
|
|
|
|
groups.append(group)
|
|
|
|
if self.name.startswith('win_'):
|
|
groups.append('windows')
|
|
|
|
if self.name.startswith('connection_'):
|
|
groups.append('connection')
|
|
|
|
if self.name.startswith('setup_') or self.name.startswith('prepare_'):
|
|
groups.append('hidden')
|
|
|
|
if self.type not in ('script', 'role'):
|
|
groups.append('hidden')
|
|
|
|
# Collect file paths before group expansion to avoid including the directories.
|
|
# Ignore references to test targets, as those must be defined using `needs/target/*` or other target references.
|
|
self.needs_file = tuple(sorted(set('/'.join(g.split('/')[2:]) for g in groups if
|
|
g.startswith('needs/file/') and not g.startswith('needs/file/test/integration/targets/'))))
|
|
|
|
for group in itertools.islice(groups, 0, len(groups)):
|
|
if '/' in group:
|
|
parts = group.split('/')
|
|
for i in range(1, len(parts)):
|
|
groups.append('/'.join(parts[:i]))
|
|
|
|
if not any(g in self.non_posix for g in groups):
|
|
groups.append('posix')
|
|
|
|
# aliases
|
|
|
|
aliases = [self.name] + \
|
|
['%s/' % g for g in groups] + \
|
|
['%s/%s' % (g, self.name) for g in groups if g not in self.categories]
|
|
|
|
if 'hidden/' in aliases:
|
|
aliases = ['hidden/'] + ['hidden/%s' % a for a in aliases if not a.startswith('hidden/')]
|
|
|
|
self.aliases = tuple(sorted(set(aliases)))
|
|
|
|
# configuration
|
|
|
|
self.setup_once = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('setup/once/'))))
|
|
self.setup_always = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('setup/always/'))))
|
|
self.needs_target = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('needs/target/'))))
|
|
|
|
|
|
class TargetPatternsNotMatched(ApplicationError):
|
|
"""One or more targets were not matched when a match was required."""
|
|
def __init__(self, patterns):
|
|
"""
|
|
:type patterns: set[str]
|
|
"""
|
|
self.patterns = sorted(patterns)
|
|
|
|
if len(patterns) > 1:
|
|
message = 'Target patterns not matched:\n%s' % '\n'.join(self.patterns)
|
|
else:
|
|
message = 'Target pattern not matched: %s' % self.patterns[0]
|
|
|
|
super(TargetPatternsNotMatched, self).__init__(message)
|