ansible/test/lib/ansible_test/_internal/classification.py
2020-03-27 15:56:02 -07:00

898 lines
31 KiB
Python

"""Classify changes in Ansible code."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import collections
import os
import re
import time
from . import types as t
from .target import (
walk_module_targets,
walk_integration_targets,
walk_units_targets,
walk_compile_targets,
walk_sanity_targets,
load_integration_prefixes,
analyze_integration_target_dependencies,
)
from .util import (
display,
is_subdir,
)
from .import_analysis import (
get_python_module_utils_imports,
get_python_module_utils_name,
)
from .csharp_import_analysis import (
get_csharp_module_utils_imports,
get_csharp_module_utils_name,
)
from .powershell_import_analysis import (
get_powershell_module_utils_imports,
get_powershell_module_utils_name,
)
from .config import (
TestConfig,
IntegrationConfig,
)
from .metadata import (
ChangeDescription,
)
from .data import (
data_context,
)
FOCUSED_TARGET = '__focused__'
def categorize_changes(args, paths, verbose_command=None):
"""
:type args: TestConfig
:type paths: list[str]
:type verbose_command: str
:rtype: ChangeDescription
"""
mapper = PathMapper(args)
commands = {
'sanity': set(),
'units': set(),
'integration': set(),
'windows-integration': set(),
'network-integration': set(),
}
focused_commands = collections.defaultdict(set)
deleted_paths = set()
original_paths = set()
additional_paths = set()
no_integration_paths = set()
for path in paths:
if not os.path.exists(path):
deleted_paths.add(path)
continue
original_paths.add(path)
dependent_paths = mapper.get_dependent_paths(path)
if not dependent_paths:
continue
display.info('Expanded "%s" to %d dependent file(s):' % (path, len(dependent_paths)), verbosity=2)
for dependent_path in dependent_paths:
display.info(dependent_path, verbosity=2)
additional_paths.add(dependent_path)
additional_paths -= set(paths) # don't count changed paths as additional paths
if additional_paths:
display.info('Expanded %d changed file(s) into %d additional dependent file(s).' % (len(paths), len(additional_paths)))
paths = sorted(set(paths) | additional_paths)
display.info('Mapping %d changed file(s) to tests.' % len(paths))
none_count = 0
for path in paths:
tests = mapper.classify(path)
if tests is None:
focused_target = False
display.info('%s -> all' % path, verbosity=1)
tests = all_tests(args) # not categorized, run all tests
display.warning('Path not categorized: %s' % path)
else:
focused_target = tests.pop(FOCUSED_TARGET, False) and path in original_paths
tests = dict((key, value) for key, value in tests.items() if value)
if focused_target and not any('integration' in command for command in tests):
no_integration_paths.add(path) # path triggers no integration tests
if verbose_command:
result = '%s: %s' % (verbose_command, tests.get(verbose_command) or 'none')
# identify targeted integration tests (those which only target a single integration command)
if 'integration' in verbose_command and tests.get(verbose_command):
if not any('integration' in command for command in tests if command != verbose_command):
if focused_target:
result += ' (focused)'
result += ' (targeted)'
else:
result = '%s' % tests
if not tests.get(verbose_command):
# minimize excessive output from potentially thousands of files which do not trigger tests
none_count += 1
verbosity = 2
else:
verbosity = 1
if args.verbosity >= verbosity:
display.info('%s -> %s' % (path, result), verbosity=1)
for command, target in tests.items():
commands[command].add(target)
if focused_target:
focused_commands[command].add(target)
if none_count > 0 and args.verbosity < 2:
display.notice('Omitted %d file(s) that triggered no tests.' % none_count)
for command in commands:
commands[command].discard('none')
if any(target == 'all' for target in commands[command]):
commands[command] = set(['all'])
commands = dict((c, sorted(commands[c])) for c in commands if commands[c])
focused_commands = dict((c, sorted(focused_commands[c])) for c in focused_commands)
for command in commands:
if commands[command] == ['all']:
commands[command] = [] # changes require testing all targets, do not filter targets
changes = ChangeDescription()
changes.command = verbose_command
changes.changed_paths = sorted(original_paths)
changes.deleted_paths = sorted(deleted_paths)
changes.regular_command_targets = commands
changes.focused_command_targets = focused_commands
changes.no_integration_paths = sorted(no_integration_paths)
return changes
class PathMapper:
"""Map file paths to test commands and targets."""
def __init__(self, args):
"""
:type args: TestConfig
"""
self.args = args
self.integration_all_target = get_integration_all_target(self.args)
self.integration_targets = list(walk_integration_targets())
self.module_targets = list(walk_module_targets())
self.compile_targets = list(walk_compile_targets())
self.units_targets = list(walk_units_targets())
self.sanity_targets = list(walk_sanity_targets())
self.powershell_targets = [target for target in self.sanity_targets if os.path.splitext(target.path)[1] in ('.ps1', '.psm1')]
self.csharp_targets = [target for target in self.sanity_targets if os.path.splitext(target.path)[1] == '.cs']
self.units_modules = set(target.module for target in self.units_targets if target.module)
self.units_paths = set(a for target in self.units_targets for a in target.aliases)
self.sanity_paths = set(target.path for target in self.sanity_targets)
self.module_names_by_path = dict((target.path, target.module) for target in self.module_targets)
self.integration_targets_by_name = dict((target.name, target) for target in self.integration_targets)
self.integration_targets_by_alias = dict((a, target) for target in self.integration_targets for a in target.aliases)
self.posix_integration_by_module = dict((m, target.name) for target in self.integration_targets
if 'posix/' in target.aliases for m in target.modules)
self.windows_integration_by_module = dict((m, target.name) for target in self.integration_targets
if 'windows/' in target.aliases for m in target.modules)
self.network_integration_by_module = dict((m, target.name) for target in self.integration_targets
if 'network/' in target.aliases for m in target.modules)
self.prefixes = load_integration_prefixes()
self.integration_dependencies = analyze_integration_target_dependencies(self.integration_targets)
self.python_module_utils_imports = {} # populated on first use to reduce overhead when not needed
self.powershell_module_utils_imports = {} # populated on first use to reduce overhead when not needed
self.csharp_module_utils_imports = {} # populated on first use to reduce overhead when not needed
self.paths_to_dependent_targets = {}
for target in self.integration_targets:
for path in target.needs_file:
if path not in self.paths_to_dependent_targets:
self.paths_to_dependent_targets[path] = set()
self.paths_to_dependent_targets[path].add(target)
def get_dependent_paths(self, path):
"""
:type path: str
:rtype: list[str]
"""
unprocessed_paths = set(self.get_dependent_paths_non_recursive(path))
paths = set()
while unprocessed_paths:
queued_paths = list(unprocessed_paths)
paths |= unprocessed_paths
unprocessed_paths = set()
for queued_path in queued_paths:
new_paths = self.get_dependent_paths_non_recursive(queued_path)
for new_path in new_paths:
if new_path not in paths:
unprocessed_paths.add(new_path)
return sorted(paths)
def get_dependent_paths_non_recursive(self, path):
"""
:type path: str
:rtype: list[str]
"""
paths = self.get_dependent_paths_internal(path)
paths += [target.path + '/' for target in self.paths_to_dependent_targets.get(path, set())]
paths = sorted(set(paths))
return paths
def get_dependent_paths_internal(self, path):
"""
:type path: str
:rtype: list[str]
"""
ext = os.path.splitext(os.path.split(path)[1])[1]
if is_subdir(path, data_context().content.module_utils_path):
if ext == '.py':
return self.get_python_module_utils_usage(path)
if ext == '.psm1':
return self.get_powershell_module_utils_usage(path)
if ext == '.cs':
return self.get_csharp_module_utils_usage(path)
if is_subdir(path, data_context().content.integration_targets_path):
return self.get_integration_target_usage(path)
return []
def get_python_module_utils_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
if not self.python_module_utils_imports:
display.info('Analyzing python module_utils imports...')
before = time.time()
self.python_module_utils_imports = get_python_module_utils_imports(self.compile_targets)
after = time.time()
display.info('Processed %d python module_utils in %d second(s).' % (len(self.python_module_utils_imports), after - before))
name = get_python_module_utils_name(path)
return sorted(self.python_module_utils_imports[name])
def get_powershell_module_utils_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
if not self.powershell_module_utils_imports:
display.info('Analyzing powershell module_utils imports...')
before = time.time()
self.powershell_module_utils_imports = get_powershell_module_utils_imports(self.powershell_targets)
after = time.time()
display.info('Processed %d powershell module_utils in %d second(s).' % (len(self.powershell_module_utils_imports), after - before))
name = get_powershell_module_utils_name(path)
return sorted(self.powershell_module_utils_imports[name])
def get_csharp_module_utils_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
if not self.csharp_module_utils_imports:
display.info('Analyzing C# module_utils imports...')
before = time.time()
self.csharp_module_utils_imports = get_csharp_module_utils_imports(self.powershell_targets, self.csharp_targets)
after = time.time()
display.info('Processed %d C# module_utils in %d second(s).' % (len(self.csharp_module_utils_imports), after - before))
name = get_csharp_module_utils_name(path)
return sorted(self.csharp_module_utils_imports[name])
def get_integration_target_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
target_name = path.split('/')[3]
dependents = [os.path.join(data_context().content.integration_targets_path, target) + os.path.sep
for target in sorted(self.integration_dependencies.get(target_name, set()))]
return dependents
def classify(self, path):
"""
:type path: str
:rtype: dict[str, str] | None
"""
result = self._classify(path)
# run all tests when no result given
if result is None:
return None
# run sanity on path unless result specified otherwise
if path in self.sanity_paths and 'sanity' not in result:
result['sanity'] = path
return result
def _classify(self, path): # type: (str) -> t.Optional[t.Dict[str, str]]
"""Return the classification for the given path."""
if data_context().content.is_ansible:
return self._classify_ansible(path)
if data_context().content.collection:
return self._classify_collection(path)
return None
def _classify_common(self, path): # type: (str) -> t.Optional[t.Dict[str, str]]
"""Return the classification for the given path using rules common to all layouts."""
dirname = os.path.dirname(path)
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
minimal = {}
if is_subdir(path, '.github'):
return minimal
if is_subdir(path, data_context().content.integration_targets_path):
if not os.path.exists(path):
return minimal
target = self.integration_targets_by_name.get(path.split('/')[3])
if not target:
display.warning('Unexpected non-target found: %s' % path)
return minimal
if 'hidden/' in target.aliases:
return minimal # already expanded using get_dependent_paths
return {
'integration': target.name if 'posix/' in target.aliases else None,
'windows-integration': target.name if 'windows/' in target.aliases else None,
'network-integration': target.name if 'network/' in target.aliases else None,
FOCUSED_TARGET: True,
}
if is_subdir(path, data_context().content.integration_path):
if dirname == data_context().content.integration_path:
for command in (
'integration',
'windows-integration',
'network-integration',
):
if name == command and ext == '.cfg':
return {
command: self.integration_all_target,
}
if name == command + '.requirements' and ext == '.txt':
return {
command: self.integration_all_target,
}
return {
'integration': self.integration_all_target,
'windows-integration': self.integration_all_target,
'network-integration': self.integration_all_target,
}
if is_subdir(path, data_context().content.sanity_path):
return {
'sanity': 'all', # test infrastructure, run all sanity checks
}
if is_subdir(path, data_context().content.unit_path):
if path in self.units_paths:
return {
'units': path,
}
# changes to files which are not unit tests should trigger tests from the nearest parent directory
test_path = os.path.dirname(path)
while test_path:
if test_path + '/' in self.units_paths:
return {
'units': test_path + '/',
}
test_path = os.path.dirname(test_path)
if is_subdir(path, data_context().content.module_path):
module_name = self.module_names_by_path.get(path)
if module_name:
return {
'units': module_name if module_name in self.units_modules else None,
'integration': self.posix_integration_by_module.get(module_name) if ext == '.py' else None,
'windows-integration': self.windows_integration_by_module.get(module_name) if ext in ['.cs', '.ps1'] else None,
'network-integration': self.network_integration_by_module.get(module_name),
FOCUSED_TARGET: True,
}
return minimal
if is_subdir(path, data_context().content.module_utils_path):
if ext == '.cs':
return minimal # already expanded using get_dependent_paths
if ext == '.psm1':
return minimal # already expanded using get_dependent_paths
if ext == '.py':
return minimal # already expanded using get_dependent_paths
if is_subdir(path, data_context().content.plugin_paths['action']):
if ext == '.py':
if name.startswith('net_'):
network_target = 'network/.*_%s' % name[4:]
if any(re.search(r'^%s$' % network_target, alias) for alias in self.integration_targets_by_alias):
return {
'network-integration': network_target,
'units': 'all',
}
return {
'network-integration': self.integration_all_target,
'units': 'all',
}
if self.prefixes.get(name) == 'network':
network_platform = name
elif name.endswith('_config') and self.prefixes.get(name[:-7]) == 'network':
network_platform = name[:-7]
elif name.endswith('_template') and self.prefixes.get(name[:-9]) == 'network':
network_platform = name[:-9]
else:
network_platform = None
if network_platform:
network_target = 'network/%s/' % network_platform
if network_target in self.integration_targets_by_alias:
return {
'network-integration': network_target,
'units': 'all',
}
display.warning('Integration tests for "%s" not found.' % network_target, unique=True)
return {
'units': 'all',
}
if is_subdir(path, data_context().content.plugin_paths['connection']):
if name == '__init__':
return {
'integration': self.integration_all_target,
'windows-integration': self.integration_all_target,
'network-integration': self.integration_all_target,
'units': 'test/units/plugins/connection/',
}
units_path = 'test/units/plugins/connection/test_%s.py' % name
if units_path not in self.units_paths:
units_path = None
integration_name = 'connection_%s' % name
if integration_name not in self.integration_targets_by_name:
integration_name = None
windows_integration_name = 'connection_windows_%s' % name
if windows_integration_name not in self.integration_targets_by_name:
windows_integration_name = None
# entire integration test commands depend on these connection plugins
if name in ['winrm', 'psrp']:
return {
'windows-integration': self.integration_all_target,
'units': units_path,
}
if name == 'local':
return {
'integration': self.integration_all_target,
'network-integration': self.integration_all_target,
'units': units_path,
}
if name == 'network_cli':
return {
'network-integration': self.integration_all_target,
'units': units_path,
}
if name == 'paramiko_ssh':
return {
'integration': integration_name,
'network-integration': self.integration_all_target,
'units': units_path,
}
# other connection plugins have isolated integration and unit tests
return {
'integration': integration_name,
'windows-integration': windows_integration_name,
'units': units_path,
}
if is_subdir(path, data_context().content.plugin_paths['doc_fragments']):
return {
'sanity': 'all',
}
if is_subdir(path, data_context().content.plugin_paths['inventory']):
if name == '__init__':
return all_tests(self.args) # broad impact, run all tests
# These inventory plugins are enabled by default (see INVENTORY_ENABLED).
# Without dedicated integration tests for these we must rely on the incidental coverage from other tests.
test_all = [
'host_list',
'script',
'yaml',
'ini',
'auto',
]
if name in test_all:
posix_integration_fallback = get_integration_all_target(self.args)
else:
posix_integration_fallback = None
target = self.integration_targets_by_name.get('inventory_%s' % name)
units_path = 'test/units/plugins/inventory/test_%s.py' % name
if units_path not in self.units_paths:
units_path = None
return {
'integration': target.name if target and 'posix/' in target.aliases else posix_integration_fallback,
'windows-integration': target.name if target and 'windows/' in target.aliases else None,
'network-integration': target.name if target and 'network/' in target.aliases else None,
'units': units_path,
FOCUSED_TARGET: target is not None,
}
if (is_subdir(path, data_context().content.plugin_paths['terminal']) or
is_subdir(path, data_context().content.plugin_paths['cliconf']) or
is_subdir(path, data_context().content.plugin_paths['netconf'])):
if ext == '.py':
if name in self.prefixes and self.prefixes[name] == 'network':
network_target = 'network/%s/' % name
if network_target in self.integration_targets_by_alias:
return {
'network-integration': network_target,
'units': 'all',
}
display.warning('Integration tests for "%s" not found.' % network_target, unique=True)
return {
'units': 'all',
}
return {
'network-integration': self.integration_all_target,
'units': 'all',
}
return None
def _classify_collection(self, path): # type: (str) -> t.Optional[t.Dict[str, str]]
"""Return the classification for the given path using rules specific to collections."""
result = self._classify_common(path)
if result is not None:
return result
return None
def _classify_ansible(self, path): # type: (str) -> t.Optional[t.Dict[str, str]]
"""Return the classification for the given path using rules specific to Ansible."""
if path.startswith('test/units/compat/'):
return {
'units': 'test/units/',
}
result = self._classify_common(path)
if result is not None:
return result
dirname = os.path.dirname(path)
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
minimal = {}
if path.startswith('bin/'):
return all_tests(self.args) # broad impact, run all tests
if path.startswith('changelogs/'):
return minimal
if path.startswith('contrib/'):
return {
'units': 'test/units/contrib/'
}
if path.startswith('docs/'):
return minimal
if path.startswith('examples/'):
if path == 'examples/scripts/ConfigureRemotingForAnsible.ps1':
return {
'windows-integration': 'connection_winrm',
}
return minimal
if path.startswith('hacking/'):
return minimal
if path.startswith('lib/ansible/executor/powershell/'):
units_path = 'test/units/executor/powershell/'
if units_path not in self.units_paths:
units_path = None
return {
'windows-integration': self.integration_all_target,
'units': units_path,
}
if path.startswith('lib/ansible/'):
return all_tests(self.args) # broad impact, run all tests
if path.startswith('licenses/'):
return minimal
if path.startswith('packaging/'):
if path.startswith('packaging/requirements/'):
if name.startswith('requirements-') and ext == '.txt':
component = name.split('-', 1)[1]
candidates = (
'cloud/%s/' % component,
)
for candidate in candidates:
if candidate in self.integration_targets_by_alias:
return {
'integration': candidate,
}
return all_tests(self.args) # broad impact, run all tests
return minimal
if path.startswith('test/ansible_test/'):
return minimal # these tests are not invoked from ansible-test
if path.startswith('test/lib/ansible_test/config/'):
if name.startswith('cloud-config-'):
# noinspection PyTypeChecker
cloud_target = 'cloud/%s/' % name.split('-')[2].split('.')[0]
if cloud_target in self.integration_targets_by_alias:
return {
'integration': cloud_target,
}
if path.startswith('test/lib/ansible_test/_data/completion/'):
if path == 'test/lib/ansible_test/_data/completion/docker.txt':
return all_tests(self.args, force=True) # force all tests due to risk of breaking changes in new test environment
if path.startswith('test/lib/ansible_test/_internal/cloud/'):
cloud_target = 'cloud/%s/' % name
if cloud_target in self.integration_targets_by_alias:
return {
'integration': cloud_target,
}
return all_tests(self.args) # test infrastructure, run all tests
if path.startswith('test/lib/ansible_test/_internal/sanity/'):
return {
'sanity': 'all', # test infrastructure, run all sanity checks
}
if path.startswith('test/lib/ansible_test/_data/sanity/'):
return {
'sanity': 'all', # test infrastructure, run all sanity checks
}
if path.startswith('test/lib/ansible_test/_internal/units/'):
return {
'units': 'all', # test infrastructure, run all unit tests
}
if path.startswith('test/lib/ansible_test/_data/units/'):
return {
'units': 'all', # test infrastructure, run all unit tests
}
if path.startswith('test/lib/ansible_test/_data/pytest/'):
return {
'units': 'all', # test infrastructure, run all unit tests
}
if path.startswith('test/lib/ansible_test/_data/requirements/'):
if name in (
'integration',
'network-integration',
'windows-integration',
):
return {
name: self.integration_all_target,
}
if name in (
'sanity',
'units',
):
return {
name: 'all',
}
if name.startswith('integration.cloud.'):
cloud_target = 'cloud/%s/' % name.split('.')[2]
if cloud_target in self.integration_targets_by_alias:
return {
'integration': cloud_target,
}
if path.startswith('test/lib/'):
return all_tests(self.args) # test infrastructure, run all tests
if path.startswith('test/support/'):
return all_tests(self.args) # test infrastructure, run all tests
if path.startswith('test/utils/shippable/'):
if dirname == 'test/utils/shippable':
test_map = {
'cloud.sh': 'integration:cloud/',
'linux.sh': 'integration:all',
'network.sh': 'network-integration:all',
'remote.sh': 'integration:all',
'sanity.sh': 'sanity:all',
'units.sh': 'units:all',
'windows.sh': 'windows-integration:all',
}
test_match = test_map.get(filename)
if test_match:
test_command, test_target = test_match.split(':')
return {
test_command: test_target,
}
cloud_target = 'cloud/%s/' % name
if cloud_target in self.integration_targets_by_alias:
return {
'integration': cloud_target,
}
return all_tests(self.args) # test infrastructure, run all tests
if path.startswith('test/utils/'):
return minimal
if '/' not in path:
if path in (
'.gitattributes',
'.gitignore',
'.mailmap',
'COPYING',
'Makefile',
):
return minimal
if path in (
'setup.py',
'shippable.yml',
):
return all_tests(self.args) # broad impact, run all tests
if ext in (
'.in',
'.md',
'.rst',
'.toml',
'.txt',
):
return minimal
return None # unknown, will result in fall-back to run all tests
def all_tests(args, force=False):
"""
:type args: TestConfig
:type force: bool
:rtype: dict[str, str]
"""
if force:
integration_all_target = 'all'
else:
integration_all_target = get_integration_all_target(args)
return {
'sanity': 'all',
'units': 'all',
'integration': integration_all_target,
'windows-integration': integration_all_target,
'network-integration': integration_all_target,
}
def get_integration_all_target(args):
"""
:type args: TestConfig
:rtype: str
"""
if isinstance(args, IntegrationConfig):
return args.changed_all_target
return 'all'