Initial ansible-test support for collections. (#59197)

* Initial ansible-test support for collections.
* Include cloud config in delegation payload.
* Add missing types import and fix `t` shadowing.
* Fix plugin traceback when config_path not set.
* Fix encoding issues.
* Remove unused imports.
* More encoding fixes.
* Handle delegation outside exception handler.
* Inject ssh keys only if not already in place.
* More defensive approach to getting remote pwd.
* Add missing string format var.
* Correct PowerShell require regex.
* Rename `is_install` and `INSTALL_ROOT`.
This commit is contained in:
Matt Clay 2019-07-22 19:24:48 -07:00 committed by GitHub
parent 67c69f3540
commit 79eca9c8fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 1528 additions and 383 deletions

View file

@ -14,7 +14,7 @@ from lib.util import (
display,
find_python,
ApplicationError,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.util_common import (
@ -26,6 +26,9 @@ from lib.config import (
EnvironmentConfig,
)
from lib.data import (
data_context,
)
CHECK_YAML_VERSIONS = {}
@ -40,7 +43,7 @@ def ansible_environment(args, color=True, ansible_config=None):
env = common_environment()
path = env['PATH']
ansible_path = os.path.join(INSTALL_ROOT, 'bin')
ansible_path = os.path.join(ANSIBLE_ROOT, 'bin')
if not path.startswith(ansible_path + os.path.pathsep):
path = ansible_path + os.path.pathsep + path
@ -48,9 +51,9 @@ def ansible_environment(args, color=True, ansible_config=None):
if ansible_config:
pass
elif isinstance(args, IntegrationConfig):
ansible_config = os.path.join(INSTALL_ROOT, 'test/integration/%s.cfg' % args.command)
ansible_config = os.path.join(ANSIBLE_ROOT, 'test/integration/%s.cfg' % args.command)
else:
ansible_config = os.path.join(INSTALL_ROOT, 'test/%s/ansible.cfg' % args.command)
ansible_config = os.path.join(ANSIBLE_ROOT, 'test/%s/ansible.cfg' % args.command)
if not args.explain and not os.path.exists(ansible_config):
raise ApplicationError('Configuration not found: %s' % ansible_config)
@ -63,7 +66,7 @@ def ansible_environment(args, color=True, ansible_config=None):
ANSIBLE_RETRY_FILES_ENABLED='false',
ANSIBLE_CONFIG=os.path.abspath(ansible_config),
ANSIBLE_LIBRARY='/dev/null',
PYTHONPATH=os.path.join(INSTALL_ROOT, 'lib'),
PYTHONPATH=os.path.join(ANSIBLE_ROOT, 'lib'),
PAGER='/bin/cat',
PATH=path,
)
@ -76,6 +79,11 @@ def ansible_environment(args, color=True, ansible_config=None):
ANSIBLE_LOG_PATH=os.path.abspath('test/results/logs/debug.log'),
))
if data_context().content.collection:
env.update(dict(
ANSIBLE_COLLECTIONS_PATHS=data_context().content.collection.root,
))
return env
@ -88,7 +96,7 @@ def check_pyyaml(args, version):
return
python = find_python(version)
stdout, _dummy = run_command(args, [python, os.path.join(INSTALL_ROOT, 'test/runner/yamlcheck.py')], capture=True)
stdout, _dummy = run_command(args, [python, os.path.join(ANSIBLE_ROOT, 'test/runner/yamlcheck.py')], capture=True)
if args.explain:
return

View file

@ -19,18 +19,22 @@ from lib.target import (
from lib.util import (
display,
is_subdir,
)
from lib.import_analysis import (
get_python_module_utils_imports,
get_python_module_utils_name,
)
from lib.csharp_import_analysis import (
get_csharp_module_utils_imports,
get_csharp_module_utils_name,
)
from lib.powershell_import_analysis import (
get_powershell_module_utils_imports,
get_powershell_module_utils_name,
)
from lib.config import (
@ -42,6 +46,10 @@ from lib.metadata import (
ChangeDescription,
)
from lib.data import (
data_context,
)
FOCUSED_TARGET = '__focused__'
@ -184,7 +192,7 @@ class PathMapper:
self.compile_targets = list(walk_compile_targets())
self.units_targets = list(walk_units_targets())
self.sanity_targets = list(walk_sanity_targets())
self.powershell_targets = [t for t in self.sanity_targets if os.path.splitext(t.path)[1] == '.ps1']
self.powershell_targets = [t for t in self.sanity_targets if os.path.splitext(t.path)[1] in ('.ps1', '.psm1')]
self.csharp_targets = [t for t in self.sanity_targets if os.path.splitext(t.path)[1] == '.cs']
self.units_modules = set(t.module for t in self.units_targets if t.module)
@ -258,7 +266,7 @@ class PathMapper:
"""
ext = os.path.splitext(os.path.split(path)[1])[1]
if path.startswith('lib/ansible/module_utils/'):
if is_subdir(path, data_context().content.module_utils_path):
if ext == '.py':
return self.get_python_module_utils_usage(path)
@ -288,10 +296,7 @@ class PathMapper:
after = time.time()
display.info('Processed %d python module_utils in %d second(s).' % (len(self.python_module_utils_imports), after - before))
name = os.path.splitext(path)[0].replace('/', '.')[4:]
if name.endswith('.__init__'):
name = name[:-9]
name = get_python_module_utils_name(path)
return sorted(self.python_module_utils_imports[name])
@ -307,7 +312,7 @@ class PathMapper:
after = time.time()
display.info('Processed %d powershell module_utils in %d second(s).' % (len(self.powershell_module_utils_imports), after - before))
name = os.path.splitext(os.path.basename(path))[0]
name = get_powershell_module_utils_name(path)
return sorted(self.powershell_module_utils_imports[name])
@ -323,7 +328,7 @@ class PathMapper:
after = time.time()
display.info('Processed %d C# module_utils in %d second(s).' % (len(self.csharp_module_utils_imports), after - before))
name = os.path.splitext(os.path.basename(path))[0]
name = get_csharp_module_utils_name(path)
return sorted(self.csharp_module_utils_imports[name])

View file

@ -21,7 +21,6 @@ from lib.util import (
generate_pip_command,
read_lines_without_comments,
MAXFD,
INSTALL_ROOT,
)
from lib.delegation import (
@ -81,17 +80,25 @@ from lib.cloud import (
initialize_cloud_plugins,
)
from lib.data import (
data_context,
)
from lib.util_common import (
CommonConfig,
)
import lib.cover
def main():
"""Main program function."""
try:
os.chdir(INSTALL_ROOT)
os.chdir(data_context().content.root)
initialize_cloud_plugins()
sanity_init()
args = parse_args()
config = args.config(args)
config = args.config(args) # type: CommonConfig
display.verbosity = config.verbosity
display.truncate = config.truncate
display.redact = config.redact
@ -106,8 +113,13 @@ def main():
try:
args.func(config)
delegate_args = None
except Delegate as ex:
delegate(config, ex.exclude, ex.require, ex.integration_targets)
# save delegation args for use once we exit the exception handler
delegate_args = (ex.exclude, ex.require, ex.integration_targets)
if delegate_args:
delegate(config, *delegate_args)
display.review_warnings()
except ApplicationWarning as ex:
@ -614,6 +626,7 @@ def add_environments(parser, tox_version=False, tox_only=False):
action='store_true',
help='run from the local environment')
if data_context().content.is_ansible:
if tox_version:
environments.add_argument('--tox',
metavar='VERSION',
@ -632,6 +645,11 @@ def add_environments(parser, tox_version=False, tox_only=False):
tox.add_argument('--tox-sitepackages',
action='store_true',
help='allow access to globally installed packages')
else:
environments.set_defaults(
tox=None,
tox_sitepackages=False,
)
if tox_only:
environments.set_defaults(
@ -739,9 +757,14 @@ def add_extra_docker_options(parser, integration=True):
dest='docker_pull',
help='do not explicitly pull the latest docker images')
if data_context().content.is_ansible:
docker.add_argument('--docker-keep-git',
action='store_true',
help='transfer git related files into the docker container')
else:
docker.set_defaults(
docker_keep_git=False,
)
docker.add_argument('--docker-seccomp',
metavar='SC',
@ -848,10 +871,10 @@ def complete_network_testcase(prefix, parsed_args, **_):
return []
test_dir = 'test/integration/targets/%s/tests' % parsed_args.include[0]
connection_dirs = [path for path in [os.path.join(test_dir, name) for name in os.listdir(test_dir)] if os.path.isdir(path)]
connection_dirs = data_context().content.get_dirs(test_dir)
for connection_dir in connection_dirs:
for testcase in os.listdir(connection_dir):
for testcase in [os.path.basename(path) for path in data_context().content.get_files(connection_dir)]:
if testcase.startswith(prefix):
testcases.append(testcase.split('.')[0])

View file

@ -13,6 +13,8 @@ import random
import re
import tempfile
import lib.types as t
from lib.util import (
ApplicationError,
display,
@ -20,6 +22,7 @@ from lib.util import (
import_plugins,
load_plugins,
ABC,
to_bytes,
)
from lib.target import (
@ -30,6 +33,10 @@ from lib.config import (
IntegrationConfig,
)
from lib.data import (
data_context,
)
PROVIDERS = {}
ENVIRONMENTS = {}
@ -55,7 +62,7 @@ def get_cloud_platforms(args, targets=None):
if targets is None:
cloud_platforms = set(args.metadata.cloud_config or [])
else:
cloud_platforms = set(get_cloud_platform(t) for t in targets)
cloud_platforms = set(get_cloud_platform(target) for target in targets)
cloud_platforms.discard(None)
@ -145,7 +152,7 @@ def cloud_init(args, targets):
results[provider.platform] = dict(
platform=provider.platform,
setup_seconds=int(end_time - start_time),
targets=[t.name for t in targets],
targets=[target.name for target in targets],
)
if not args.explain and results:
@ -175,6 +182,17 @@ class CloudBase(ABC):
self.args = args
self.platform = self.__module__.split('.')[2]
def config_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
"""Add the config file to the payload file list."""
if self._get_cloud_config(self._CONFIG_PATH, ''):
pair = (self.config_path, os.path.relpath(self.config_path, data_context().content.root))
if pair not in files:
display.info('Including %s config: %s -> %s' % (self.platform, pair[0], pair[1]), verbosity=3)
files.append(pair)
data_context().register_payload_callback(config_callback)
@property
def setup_executed(self):
"""
@ -194,7 +212,7 @@ class CloudBase(ABC):
"""
:rtype: str
"""
return os.path.join(os.getcwd(), self._get_cloud_config(self._CONFIG_PATH))
return os.path.join(data_context().content.root, self._get_cloud_config(self._CONFIG_PATH))
@config_path.setter
def config_path(self, value):
@ -334,7 +352,7 @@ class CloudProvider(CloudBase):
display.info('>>> Config: %s\n%s' % (filename, content.strip()), verbosity=3)
config_fd.write(content.encode('utf-8'))
config_fd.write(to_bytes(content))
config_fd.flush()
def _read_config_template(self):

View file

@ -24,6 +24,10 @@ from lib.metadata import (
Metadata,
)
from lib.data import (
data_context,
)
class EnvironmentConfig(CommonConfig):
"""Configuration common to all commands which execute in an environment."""
@ -90,6 +94,15 @@ class EnvironmentConfig(CommonConfig):
if args.check_python and args.check_python != actual_major_minor:
raise ApplicationError('Running under Python %s instead of Python %s as expected.' % (actual_major_minor, args.check_python))
if self.docker_keep_git:
def git_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
"""Add files from the content root .git directory to the payload file list."""
for dirpath, _dirnames, filenames in os.walk(os.path.join(data_context().content.root, '.git')):
paths = [os.path.join(dirpath, filename) for filename in filenames]
files.extend((path, os.path.relpath(path, data_context().content.root)) for path in paths)
data_context().register_payload_callback(git_callback)
@property
def python_executable(self):
"""
@ -141,6 +154,20 @@ class TestConfig(EnvironmentConfig):
if self.coverage_check:
self.coverage = True
def metadata_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
"""Add the metadata file to the payload file list."""
config = self
if data_context().content.collection:
working_path = data_context().content.collection.directory
else:
working_path = ''
if self.metadata_path:
files.append((os.path.abspath(config.metadata_path), os.path.join(working_path, config.metadata_path)))
data_context().register_payload_callback(metadata_callback)
class ShellConfig(EnvironmentConfig):
"""Configuration for the shell command."""

View file

@ -8,7 +8,8 @@ import traceback
import uuid
import errno
import time
import shutil
import lib.types as t
from lib.http import (
HttpClient,
@ -21,6 +22,7 @@ from lib.util import (
make_dirs,
display,
is_shippable,
to_text,
)
from lib.util_common import (
@ -31,6 +33,10 @@ from lib.config import (
EnvironmentConfig,
)
from lib.data import (
data_context,
)
AWS_ENDPOINTS = {
'us-east-1': 'https://14blg63h2i.execute-api.us-east-1.amazonaws.com',
'us-east-2': 'https://g5xynwbk96.execute-api.us-east-2.amazonaws.com',
@ -342,7 +348,7 @@ class AnsibleCoreCI:
if self.platform == 'windows':
with open('examples/scripts/ConfigureRemotingForAnsible.ps1', 'rb') as winrm_config_fd:
winrm_config = winrm_config_fd.read().decode('utf-8')
winrm_config = to_text(winrm_config_fd.read())
else:
winrm_config = None
@ -546,11 +552,14 @@ class SshKey:
"""
:type args: EnvironmentConfig
"""
cache_dir = 'test/cache'
cache_dir = os.path.join(data_context().content.root, 'test/cache')
self.key = os.path.join(cache_dir, self.KEY_NAME)
self.pub = os.path.join(cache_dir, self.PUB_NAME)
key_dst = os.path.relpath(self.key, data_context().content.root)
pub_dst = os.path.relpath(self.pub, data_context().content.root)
if not os.path.isfile(self.key) or not os.path.isfile(self.pub):
base_dir = os.path.expanduser('~/.ansible/test/')
@ -563,9 +572,15 @@ class SshKey:
if not os.path.isfile(key) or not os.path.isfile(pub):
run_command(args, ['ssh-keygen', '-m', 'PEM', '-q', '-t', 'rsa', '-N', '', '-f', key])
if not args.explain:
shutil.copy2(key, self.key)
shutil.copy2(pub, self.pub)
self.key = key
self.pub = pub
def ssh_key_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
"""Add the SSH keys to the payload file list."""
files.append((key, key_dst))
files.append((pub, pub_dst))
data_context().register_payload_callback(ssh_key_callback)
if args.explain:
self.pub_contents = None

View file

@ -30,6 +30,10 @@ from lib.executor import (
install_command_requirements,
)
from lib.data import (
data_context,
)
COVERAGE_DIR = 'test/results/coverage'
COVERAGE_FILE = os.path.join(COVERAGE_DIR, 'coverage')
COVERAGE_GROUPS = ('command', 'target', 'environment', 'version')
@ -47,7 +51,7 @@ def command_coverage_combine(args):
coverage_files = [os.path.join(COVERAGE_DIR, f) for f in os.listdir(COVERAGE_DIR) if '=coverage.' in f]
ansible_path = os.path.abspath('lib/ansible/') + '/'
root_path = os.getcwd() + '/'
root_path = data_context().content.root + '/'
counter = 0
groups = {}
@ -81,6 +85,13 @@ def command_coverage_combine(args):
groups['=stub-%02d' % (stub_index + 1)] = dict((source, set()) for source in stub_group)
if data_context().content.collection:
collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory)
collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory)
else:
collection_search_re = None
collection_sub_re = None
for coverage_file in coverage_files:
counter += 1
display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
@ -116,6 +127,10 @@ def command_coverage_combine(args):
new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif collection_search_re and collection_search_re.search(filename):
new_name = os.path.abspath(collection_sub_re.sub('', filename))
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)

View file

@ -0,0 +1,89 @@
"""Utility code for facilitating collection of code coverage when running tests."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import contextlib
import os
import tempfile
from lib.config import (
IntegrationConfig,
SanityConfig,
TestConfig,
)
from lib.util import (
COVERAGE_CONFIG_PATH,
remove_tree,
)
from lib.data import (
data_context,
)
@contextlib.contextmanager
def coverage_context(args): # type: (TestConfig) -> None
"""Content to set up and clean up code coverage configuration for tests."""
coverage_setup(args)
try:
yield
finally:
coverage_cleanup(args)
def coverage_setup(args): # type: (TestConfig) -> None
"""Set up code coverage configuration before running tests."""
if args.coverage and data_context().content.collection:
coverage_config = generate_collection_coverage_config(args)
if args.explain:
args.coverage_config_base_path = '/tmp/coverage-temp-dir'
else:
args.coverage_config_base_path = tempfile.mkdtemp()
with open(os.path.join(args.coverage_config_base_path, COVERAGE_CONFIG_PATH), 'w') as coverage_config_path_fd:
coverage_config_path_fd.write(coverage_config)
def coverage_cleanup(args): # type: (TestConfig) -> None
"""Clean up code coverage configuration after tests have finished."""
if args.coverage_config_base_path and not args.explain:
remove_tree(args.coverage_config_base_path)
args.coverage_config_base_path = None
def generate_collection_coverage_config(args): # type: (TestConfig) -> str
"""Generate code coverage configuration for tests."""
coverage_config = '''
[run]
branch = True
concurrency = multiprocessing
parallel = True
disable_warnings =
no-data-collected
'''
if isinstance(args, IntegrationConfig):
coverage_config += '''
include =
%s/*
*/%s/*
''' % (data_context().content.root, data_context().content.collection.directory)
elif isinstance(args, SanityConfig):
# temporary work-around for import sanity test
coverage_config += '''
include =
%s/*
omit =
*/test/runner/.tox/*
''' % data_context().content.root
else:
coverage_config += '''
include =
%s/*
''' % data_context().content.root
return coverage_config

View file

@ -9,6 +9,10 @@ from lib.util import (
display,
)
from lib.data import (
data_context,
)
def get_csharp_module_utils_imports(powershell_targets, csharp_targets):
"""Return a dictionary of module_utils names mapped to sets of powershell file paths.
@ -40,11 +44,27 @@ def get_csharp_module_utils_imports(powershell_targets, csharp_targets):
return imports
def get_csharp_module_utils_name(path): # type: (str) -> str
"""Return a namespace and name from the given module_utils path."""
base_path = data_context().content.module_utils_csharp_path
if data_context().content.collection:
prefix = 'AnsibleCollections.' + data_context().content.collection.prefix
else:
prefix = ''
name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.sep, '.')
return name
def enumerate_module_utils():
"""Return a list of available module_utils imports.
:rtype: set[str]
"""
return set(os.path.splitext(p)[0] for p in os.listdir('lib/ansible/module_utils/csharp') if os.path.splitext(p)[1] == '.cs')
return set(get_csharp_module_utils_name(p)
for p in data_context().content.walk_files(data_context().content.module_utils_csharp_path)
if os.path.splitext(p)[1] == '.cs')
def extract_csharp_module_utils_imports(path, module_utils, is_pure_csharp):
@ -56,9 +76,9 @@ def extract_csharp_module_utils_imports(path, module_utils, is_pure_csharp):
"""
imports = set()
if is_pure_csharp:
pattern = re.compile(r'(?i)^using\s(Ansible\..+);$')
pattern = re.compile(r'(?i)^using\s((?:Ansible|AnsibleCollections)\..+);$')
else:
pattern = re.compile(r'(?i)^#\s*ansiblerequires\s+-csharputil\s+(Ansible\..+)')
pattern = re.compile(r'(?i)^#\s*ansiblerequires\s+-csharputil\s+((?:Ansible|AnsibleCollections)\..+)')
with open(path, 'r') as module_file:
for line_number, line in enumerate(module_file, 1):

148
test/runner/lib/data.py Normal file
View file

@ -0,0 +1,148 @@
"""Context information for the current invocation of ansible-test."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import lib.types as t
from lib.util import (
ApplicationError,
import_plugins,
ANSIBLE_ROOT,
is_subdir,
)
from lib.provider import (
find_path_provider,
get_path_provider_classes,
ProviderNotFoundForPath,
)
from lib.provider.source import (
SourceProvider,
)
from lib.provider.source.unversioned import (
UnversionedSource,
)
from lib.provider.layout import (
ContentLayout,
InstallLayout,
LayoutProvider,
)
class UnexpectedSourceRoot(ApplicationError):
"""Exception generated when a source root is found below a layout root."""
def __init__(self, source_root, layout_root): # type: (str, str) -> None
super(UnexpectedSourceRoot, self).__init__('Source root "%s" cannot be below layout root "%s".' % (source_root, layout_root))
self.source_root = source_root
self.layout_root = layout_root
class DataContext:
"""Data context providing details about the current execution environment for ansible-test."""
def __init__(self):
content_path = os.environ.get('ANSIBLE_TEST_CONTENT_ROOT')
current_path = os.getcwd()
self.__layout_providers = get_path_provider_classes(LayoutProvider)
self.__source_providers = get_path_provider_classes(SourceProvider)
self.payload_callbacks = [] # type: t.List[t.Callable[t.List[t.Tuple[str, str]], None]]
if content_path:
content = self.create_content_layout(self.__layout_providers, self.__source_providers, content_path, False)
if content.is_ansible:
install = content
else:
install = None
elif is_subdir(current_path, ANSIBLE_ROOT):
content = self.create_content_layout(self.__layout_providers, self.__source_providers, ANSIBLE_ROOT, False)
install = InstallLayout(ANSIBLE_ROOT, content.all_files())
else:
content = self.create_content_layout(self.__layout_providers, self.__source_providers, current_path, True)
install = None
self.__install = install # type: t.Optional[InstallLayout]
self.content = content # type: ContentLayout
@staticmethod
def create_content_layout(layout_providers, # type: t.List[t.Type[LayoutProvider]]
source_providers, # type: t.List[t.Type[SourceProvider]]
root, # type: str
walk, # type: bool
): # type: (...) -> ContentLayout
"""Create a content layout using the given providers and root path."""
layout_provider = find_path_provider(LayoutProvider, layout_providers, root, walk)
try:
source_provider = find_path_provider(SourceProvider, source_providers, root, walk)
except ProviderNotFoundForPath:
source_provider = UnversionedSource(layout_provider.root)
if source_provider.root != layout_provider.root and is_subdir(source_provider.root, layout_provider.root):
raise UnexpectedSourceRoot(source_provider.root, layout_provider.root)
layout = layout_provider.create(layout_provider.root, source_provider.get_paths(layout_provider.root))
return layout
@staticmethod
def create_install_layout(source_providers): # type: (t.List[t.Type[SourceProvider]]) -> InstallLayout
"""Create an install layout using the given source provider."""
try:
source_provider = find_path_provider(SourceProvider, source_providers, ANSIBLE_ROOT, False)
except ProviderNotFoundForPath:
source_provider = UnversionedSource(ANSIBLE_ROOT)
paths = source_provider.get_paths(ANSIBLE_ROOT)
return InstallLayout(ANSIBLE_ROOT, paths)
@property
def install(self): # type: () -> InstallLayout
"""Return the install context, loaded on demand."""
if not self.__install:
self.__install = self.create_install_layout(self.__source_providers)
return self.__install
def register_payload_callback(self, callback): # type: (t.Callable[t.List[t.Tuple[str, str]], None]) -> None
"""Register the given payload callback."""
self.payload_callbacks.append(callback)
def data_init(): # type: () -> DataContext
"""Initialize provider plugins."""
provider_types = (
'layout',
'source',
)
for provider_type in provider_types:
import_plugins('provider/%s' % provider_type)
try:
context = DataContext()
except ProviderNotFoundForPath:
raise ApplicationError('''The current working directory must be at or below one of:
- Ansible source: %s/
- Ansible collection: {...}/ansible_collections/{namespace}/{collection}/
Current working directory: %s''' % (ANSIBLE_ROOT, os.getcwd()))
return context
def data_context(): # type: () -> DataContext
"""Return the current data context."""
try:
return data_context.instance
except AttributeError:
data_context.instance = data_init()
return data_context.instance

View file

@ -46,7 +46,7 @@ from lib.util import (
from lib.util_common import (
run_command,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.docker_util import (
@ -69,6 +69,10 @@ from lib.target import (
IntegrationTarget,
)
from lib.data import (
data_context,
)
from lib.payload import (
create_payload,
)
@ -96,7 +100,7 @@ def delegate(args, exclude, require, integration_targets):
:rtype: bool
"""
if isinstance(args, TestConfig):
with tempfile.NamedTemporaryFile(prefix='metadata-', suffix='.json', dir=os.getcwd()) as metadata_fd:
with tempfile.NamedTemporaryFile(prefix='metadata-', suffix='.json', dir=data_context().content.root) as metadata_fd:
args.metadata_path = os.path.basename(metadata_fd.name)
args.metadata.to_file(args.metadata_path)
@ -165,7 +169,7 @@ def delegate_tox(args, exclude, require, integration_targets):
tox.append('--')
cmd = generate_command(args, None, INSTALL_ROOT, INSTALL_ROOT, options, exclude, require)
cmd = generate_command(args, None, ANSIBLE_ROOT, data_context().content.root, options, exclude, require)
if not args.python:
cmd += ['--python', version]
@ -228,6 +232,10 @@ def delegate_docker(args, exclude, require, integration_targets):
python_interpreter = get_python_interpreter(args, get_docker_completion(), args.docker_raw)
install_root = '/root/ansible'
if data_context().content.collection:
content_root = os.path.join(install_root, data_context().content.collection.directory)
else:
content_root = install_root
cmd = generate_command(args, python_interpreter, install_root, content_root, options, exclude, require)
@ -296,7 +304,7 @@ def delegate_docker(args, exclude, require, integration_targets):
test_id = test_id.strip()
# write temporary files to /root since /tmp isn't ready immediately on container start
docker_put(args, test_id, os.path.join(INSTALL_ROOT, 'test/runner/setup/docker.sh'), '/root/docker.sh')
docker_put(args, test_id, os.path.join(ANSIBLE_ROOT, 'test/runner/setup/docker.sh'), '/root/docker.sh')
docker_exec(args, test_id, ['/bin/bash', '/root/docker.sh'])
docker_put(args, test_id, local_source_fd.name, '/root/ansible.tgz')
docker_exec(args, test_id, ['mkdir', '/root/ansible'])
@ -310,12 +318,17 @@ def delegate_docker(args, exclude, require, integration_targets):
# also disconnect from the network once requirements have been installed
if isinstance(args, UnitsConfig):
writable_dirs = [
os.path.join(content_root, '.pytest_cache'),
os.path.join(install_root, '.pytest_cache'),
]
if content_root != install_root:
writable_dirs.append(os.path.join(content_root, 'test/results/junit'))
writable_dirs.append(os.path.join(content_root, 'test/results/coverage'))
docker_exec(args, test_id, ['mkdir', '-p'] + writable_dirs)
docker_exec(args, test_id, ['chmod', '777'] + writable_dirs)
if content_root == install_root:
docker_exec(args, test_id, ['find', os.path.join(content_root, 'test/results/'), '-type', 'd', '-exec', 'chmod', '777', '{}', '+'])
docker_exec(args, test_id, ['chmod', '755', '/root'])
@ -387,21 +400,34 @@ def delegate_remote(args, exclude, require, integration_targets):
core_ci.wait()
python_version = get_python_version(args, get_remote_completion(), args.remote)
if platform == 'windows':
# Windows doesn't need the ansible-test fluff, just run the SSH command
manage = ManageWindowsCI(core_ci)
manage.setup(python_version)
cmd = ['powershell.exe']
elif raw:
manage = ManagePosixCI(core_ci)
manage.setup(python_version)
cmd = create_shell_command(['bash'])
else:
manage = ManagePosixCI(core_ci)
pwd = manage.setup(python_version)
options = {
'--remote': 1,
}
python_interpreter = get_python_interpreter(args, get_remote_completion(), args.remote)
install_root = 'ansible'
install_root = os.path.join(pwd, 'ansible')
if data_context().content.collection:
content_root = os.path.join(install_root, data_context().content.collection.directory)
else:
content_root = install_root
cmd = generate_command(args, python_interpreter, install_root, content_root, options, exclude, require)
@ -421,11 +447,6 @@ def delegate_remote(args, exclude, require, integration_targets):
if isinstance(args, UnitsConfig) and not args.python:
cmd += ['--python', 'default']
manage = ManagePosixCI(core_ci)
python_version = get_python_version(args, get_remote_completion(), args.remote)
manage.setup(python_version)
if isinstance(args, IntegrationConfig):
cloud_platforms = get_cloud_providers(args)

View file

@ -58,7 +58,7 @@ from lib.util import (
get_remote_completion,
COVERAGE_OUTPUT_PATH,
cmd_quote,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.util_common import (
@ -125,6 +125,14 @@ from lib.integration import (
setup_common_temp_dir,
)
from lib.coverage_util import (
coverage_context,
)
from lib.data import (
data_context,
)
SUPPORTED_PYTHON_VERSIONS = (
'2.6',
'2.7',
@ -179,6 +187,10 @@ def install_command_requirements(args, python_version=None):
:type args: EnvironmentConfig
:type python_version: str | None
"""
if not args.explain:
make_dirs('test/results/coverage')
make_dirs('test/results/data')
if isinstance(args, ShellConfig):
if args.raw:
return
@ -278,13 +290,13 @@ def generate_egg_info(args):
"""
:type args: EnvironmentConfig
"""
if not os.path.exists(os.path.join(INSTALL_ROOT, 'setup.py')):
if not os.path.exists(os.path.join(ANSIBLE_ROOT, 'setup.py')):
return
if os.path.isdir(os.path.join(INSTALL_ROOT, 'lib/ansible.egg-info')):
if os.path.isdir(os.path.join(ANSIBLE_ROOT, 'lib/ansible.egg-info')):
return
run_command(args, [args.python_executable, 'setup.py', 'egg_info'], cwd=INSTALL_ROOT, capture=args.verbosity < 3)
run_command(args, [args.python_executable, 'setup.py', 'egg_info'], cwd=ANSIBLE_ROOT, capture=args.verbosity < 3)
def generate_pip_install(pip, command, packages=None):
@ -294,8 +306,8 @@ def generate_pip_install(pip, command, packages=None):
:type packages: list[str] | None
:rtype: list[str] | None
"""
constraints = os.path.join(INSTALL_ROOT, 'test/runner/requirements/constraints.txt')
requirements = os.path.join(INSTALL_ROOT, 'test/runner/requirements/%s.txt' % command)
constraints = os.path.join(ANSIBLE_ROOT, 'test/runner/requirements/constraints.txt')
requirements = os.path.join(ANSIBLE_ROOT, 'test/runner/requirements/%s.txt' % command)
options = []
@ -1345,8 +1357,11 @@ def command_units(args):
if args.coverage:
plugins.append('ansible_pytest_coverage')
if data_context().content.collection:
plugins.append('ansible_pytest_collections')
if plugins:
env['PYTHONPATH'] += ':%s' % os.path.join(INSTALL_ROOT, 'test/units/pytest/plugins')
env['PYTHONPATH'] += ':%s' % os.path.join(ANSIBLE_ROOT, 'test/units/pytest/plugins')
for plugin in plugins:
cmd.extend(['-p', plugin])
@ -1370,6 +1385,7 @@ def command_units(args):
display.info('Unit test with Python %s' % version)
try:
with coverage_context(args):
intercept_command(args, command, target_name='units', env=env, python_version=version)
except SubprocessError as ex:
# pytest exits with status code 5 when all tests are skipped, which isn't an error for our use case
@ -1815,7 +1831,7 @@ class EnvironmentDescription:
versions += SUPPORTED_PYTHON_VERSIONS
versions += list(set(v.split('.')[0] for v in SUPPORTED_PYTHON_VERSIONS))
version_check = os.path.join(INSTALL_ROOT, 'test/runner/versions.py')
version_check = os.path.join(ANSIBLE_ROOT, 'test/runner/versions.py')
python_paths = dict((v, find_executable('python%s' % v, required=False)) for v in sorted(versions))
pip_paths = dict((v, find_executable('pip%s' % v, required=False)) for v in sorted(versions))
program_versions = dict((v, self.get_version([python_paths[v], version_check], warnings)) for v in sorted(python_paths) if python_paths[v])

View file

@ -10,6 +10,10 @@ from lib.util import (
ApplicationError,
)
from lib.data import (
data_context,
)
VIRTUAL_PACKAGES = set([
'ansible.module_utils.six',
])
@ -22,6 +26,7 @@ def get_python_module_utils_imports(compile_targets):
"""
module_utils = enumerate_module_utils()
virtual_utils = set(m for m in module_utils if any(m.startswith('%s.' % v) for v in VIRTUAL_PACKAGES))
module_utils -= virtual_utils
@ -115,34 +120,39 @@ def get_python_module_utils_imports(compile_targets):
return imports
def get_python_module_utils_name(path): # type: (str) -> str
"""Return a namespace and name from the given module_utils path."""
base_path = data_context().content.module_utils_path
if data_context().content.collection:
prefix = 'ansible_collections.' + data_context().content.collection.prefix
else:
prefix = 'ansible.module_utils.'
if path.endswith('/__init__.py'):
path = os.path.dirname(path)
name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.sep, '.')
return name
def enumerate_module_utils():
"""Return a list of available module_utils imports.
:rtype: set[str]
"""
module_utils = []
base_path = 'lib/ansible/module_utils'
paths = []
for path in data_context().content.walk_files(data_context().content.module_utils_path):
ext = os.path.splitext(path)[1]
for root, _dir_names, file_names in os.walk(base_path):
for file_name in file_names:
paths.append(os.path.join(root, file_name))
for path in paths:
name, ext = os.path.splitext(path)
if path == 'lib/ansible/module_utils/__init__.py':
if path == os.path.join(data_context().content.module_utils_path, '__init__.py'):
continue
if ext != '.py':
continue
if name.endswith('/__init__'):
module_util = os.path.dirname(name)
else:
module_util = name
module_utils.append(module_util[4:].replace('/', '.'))
module_utils.append(get_python_module_utils_name(path))
return set(module_utils)

View file

@ -28,13 +28,18 @@ from lib.util import (
MODE_DIRECTORY,
MODE_DIRECTORY_WRITE,
MODE_FILE,
INSTALL_ROOT,
ANSIBLE_ROOT,
to_bytes,
)
from lib.util_common import (
named_temporary_file,
)
from lib.coverage_util import (
generate_collection_coverage_config,
)
from lib.cache import (
CommonCache,
)
@ -43,6 +48,10 @@ from lib.cloud import (
CloudEnvironmentConfig,
)
from lib.data import (
data_context,
)
def setup_common_temp_dir(args, path):
"""
@ -57,7 +66,14 @@ def setup_common_temp_dir(args, path):
coverage_config_path = os.path.join(path, COVERAGE_CONFIG_PATH)
shutil.copy(COVERAGE_CONFIG_PATH, coverage_config_path)
if data_context().content.collection:
coverage_config = generate_collection_coverage_config(args)
with open(coverage_config_path, 'w') as coverage_config_fd:
coverage_config_fd.write(coverage_config)
else:
shutil.copy(os.path.join(ANSIBLE_ROOT, COVERAGE_CONFIG_PATH), coverage_config_path)
os.chmod(coverage_config_path, MODE_FILE)
coverage_output_path = os.path.join(path, COVERAGE_OUTPUT_PATH)
@ -145,9 +161,6 @@ def integration_test_environment(args, target, inventory_path):
display.warning('Disabling unicode in the temp work dir is a temporary debugging feature that may be removed in the future without notice.')
suffix = '-ansible'
if isinstance('', bytes):
suffix = suffix.encode('utf-8')
if args.explain:
temp_dir = os.path.join(root_temp_dir, '%stemp%s' % (prefix, suffix))
else:
@ -175,9 +188,9 @@ def integration_test_environment(args, target, inventory_path):
ansible_config = os.path.join(integration_dir, '%s.cfg' % args.command)
file_copies = [
(os.path.join(INSTALL_ROOT, 'test/integration/%s.cfg' % args.command), ansible_config),
(os.path.join(INSTALL_ROOT, 'test/integration/integration_config.yml'), os.path.join(integration_dir, vars_file)),
(os.path.join(INSTALL_ROOT, inventory_path), os.path.join(integration_dir, inventory_name)),
(os.path.join(ANSIBLE_ROOT, 'test/integration/%s.cfg' % args.command), ansible_config),
(os.path.join(ANSIBLE_ROOT, 'test/integration/integration_config.yml'), os.path.join(integration_dir, vars_file)),
(os.path.join(ANSIBLE_ROOT, inventory_path), os.path.join(integration_dir, inventory_name)),
]
file_copies += [(path, os.path.join(temp_dir, path)) for path in files_needed]
@ -207,7 +220,7 @@ def integration_test_environment(args, target, inventory_path):
display.info('Copying %s/ to %s/' % (dir_src, dir_dst), verbosity=2)
if not args.explain:
shutil.copytree(dir_src, dir_dst, symlinks=True)
shutil.copytree(to_bytes(dir_src), to_bytes(dir_dst), symlinks=True)
for file_src, file_dst in file_copies:
display.info('Copying %s to %s' % (file_src, file_dst), verbosity=2)

View file

@ -10,12 +10,13 @@ from lib.util import (
SubprocessError,
ApplicationError,
cmd_quote,
display,
)
from lib.util_common import (
intercept_command,
run_command,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.core_ci import (
@ -214,22 +215,36 @@ class ManagePosixCI:
def setup(self, python_version):
"""Start instance and wait for it to become ready and respond to an ansible ping.
:type python_version: str
:rtype: str
"""
self.wait()
pwd = self.wait()
display.info('Remote working directory: %s' % pwd, verbosity=1)
if isinstance(self.core_ci.args, ShellConfig):
if self.core_ci.args.raw:
return
return pwd
self.configure(python_version)
self.upload_source()
def wait(self):
return pwd
def wait(self): # type: () -> str
"""Wait for instance to respond to SSH."""
for dummy in range(1, 90):
try:
self.ssh('id')
return
stdout = self.ssh('pwd', capture=True)[0]
if self.core_ci.args.explain:
return '/pwd'
pwd = stdout.strip().splitlines()[-1]
if not pwd.startswith('/'):
raise Exception('Unexpected current working directory "%s" from "pwd" command output:\n%s' % (pwd, stdout))
return pwd
except SubprocessError:
time.sleep(10)
@ -240,7 +255,7 @@ class ManagePosixCI:
"""Configure remote host for testing.
:type python_version: str
"""
self.upload(os.path.join(INSTALL_ROOT, 'test/runner/setup/remote.sh'), '/tmp')
self.upload(os.path.join(ANSIBLE_ROOT, 'test/runner/setup/remote.sh'), '/tmp')
self.ssh('chmod +x /tmp/remote.sh && /tmp/remote.sh %s %s' % (self.core_ci.platform, python_version))
def upload_source(self):
@ -268,10 +283,12 @@ class ManagePosixCI:
"""
self.scp(local, '%s@%s:%s' % (self.core_ci.connection.username, self.core_ci.connection.hostname, remote))
def ssh(self, command, options=None):
def ssh(self, command, options=None, capture=False):
"""
:type command: str | list[str]
:type options: list[str] | None
:type capture: bool
:rtype: str | None, str | None
"""
if not options:
options = []
@ -279,12 +296,12 @@ class ManagePosixCI:
if isinstance(command, list):
command = ' '.join(cmd_quote(c) for c in command)
run_command(self.core_ci.args,
return run_command(self.core_ci.args,
['ssh', '-tt', '-q'] + self.ssh_args +
options +
['-p', str(self.core_ci.connection.port),
'%s@%s' % (self.core_ci.connection.username, self.core_ci.connection.hostname)] +
self.become + [cmd_quote(command)])
self.become + [cmd_quote(command)], capture=capture)
def scp(self, src, dst):
"""

View file

@ -2,26 +2,80 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import tarfile
import time
from lib.config import (
CommonConfig,
EnvironmentConfig,
IntegrationConfig,
ShellConfig,
)
from lib.pytar import (
AllowGitTarFilter,
create_tarfile,
DefaultTarFilter,
from lib.util import (
display,
ANSIBLE_ROOT,
)
from lib.data import (
data_context,
)
from lib.util_common import (
CommonConfig,
)
# improve performance by disabling uid/gid lookups
tarfile.pwd = None
tarfile.grp = None
def create_payload(args, dst_path): # type: (CommonConfig, str) -> None
"""Create a payload for delegation."""
if args.explain:
return
if isinstance(args, EnvironmentConfig) and args.docker_keep_git:
tar_filter = AllowGitTarFilter()
else:
tar_filter = DefaultTarFilter()
files = [(os.path.join(ANSIBLE_ROOT, path), path) for path in data_context().install.all_files()]
create_tarfile(dst_path, '.', tar_filter)
if not data_context().content.is_ansible:
files = [f for f in files if
f[1].startswith('bin/') or
f[1].startswith('lib/') or
f[1].startswith('test/runner/') or
f[1].startswith('packaging/requirements/') or
f[1].startswith('test/sanity/') or # sanity only
f[1].startswith('test/units/pytest/plugins/') or # units only
f[1] in (
'setup.py',
'README.rst',
'requirements.txt',
# units only
'test/units/ansible.cfg',
# integration only
'test/integration/integration.cfg',
'test/integration/integration_config.yml',
'test/integration/inventory',
)]
if not isinstance(args, (ShellConfig, IntegrationConfig)):
files = [f for f in files if not f[1].startswith('lib/ansible/modules/') or f[1] == 'lib/ansible/modules/__init__.py']
if data_context().content.collection:
files.extend((os.path.join(data_context().content.root, path), os.path.join(data_context().content.collection.directory, path))
for path in data_context().content.all_files())
for callback in data_context().payload_callbacks:
callback(files)
display.info('Creating a payload archive containing %d files...' % len(files), verbosity=1)
start = time.time()
with tarfile.TarFile.gzopen(dst_path, mode='w', compresslevel=4) as tar:
for src, dst in files:
display.info('%s -> %s' % (src, dst), verbosity=4)
tar.add(src, dst)
duration = time.time() - start
payload_size_bytes = os.path.getsize(dst_path)
display.info('Created a %d byte payload archive containing %d files in %d seconds.' % (payload_size_bytes, len(files), duration), verbosity=1)

View file

@ -9,6 +9,10 @@ from lib.util import (
display,
)
from lib.data import (
data_context,
)
def get_powershell_module_utils_imports(powershell_targets):
"""Return a dictionary of module_utils names mapped to sets of powershell file paths.
@ -36,11 +40,27 @@ def get_powershell_module_utils_imports(powershell_targets):
return imports
def get_powershell_module_utils_name(path): # type: (str) -> str
"""Return a namespace and name from the given module_utils path."""
base_path = data_context().content.module_utils_powershell_path
if data_context().content.collection:
prefix = 'AnsibleCollections.' + data_context().content.collection.prefix
else:
prefix = ''
name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.sep, '.')
return name
def enumerate_module_utils():
"""Return a list of available module_utils imports.
:rtype: set[str]
"""
return set(os.path.splitext(p)[0] for p in os.listdir('lib/ansible/module_utils/powershell') if os.path.splitext(p)[1] == '.psm1')
return set(get_powershell_module_utils_name(p)
for p in data_context().content.walk_files(data_context().content.module_utils_powershell_path)
if os.path.splitext(p)[1] == '.psm1')
def extract_powershell_module_utils_imports(path, module_utils):
@ -62,7 +82,7 @@ def extract_powershell_module_utils_imports(path, module_utils):
for line in lines:
line_number += 1
match = re.search(r'(?i)^#\s*requires\s+-module(?:s?)\s*(Ansible\.ModuleUtils\..+)', line)
match = re.search(r'(?i)^#\s*(?:requires\s+-module(?:s?)|ansiblerequires\s+-powershell)\s*((?:Ansible|AnsibleCollections)\..+)', line)
if not match:
continue

View file

@ -0,0 +1,74 @@
"""Provider (plugin) infrastructure for ansible-test."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import abc
import os
import lib.types as t
from lib.util import (
ABC,
ApplicationError,
get_subclasses,
)
try:
C = t.TypeVar('C', 'PathProvider', 'PathProvider')
except AttributeError:
pass
def get_path_provider_classes(provider_type): # type: (t.Type[C]) -> t.List[t.Type[C]]
"""Return a list of path provider classes of the given type."""
return sorted(get_subclasses(provider_type), key=lambda c: (c.priority, c.__name__))
def find_path_provider(provider_type, provider_classes, path, walk): # type: (t.Type[C], t.List[t.Type[C]], str, bool) -> C
"""Return the first found path provider of the given type for the given path."""
sequences = sorted(set(pc.sequence for pc in provider_classes if pc.sequence > 0))
for sequence in sequences:
candidate_path = path
tier_classes = [pc for pc in provider_classes if pc.sequence == sequence]
while True:
for provider_class in tier_classes:
if provider_class.is_content_root(candidate_path):
return provider_class(candidate_path)
if not walk:
break
parent_path = os.path.dirname(candidate_path)
if parent_path == candidate_path:
break
candidate_path = parent_path
raise ProviderNotFoundForPath(provider_type, path)
class ProviderNotFoundForPath(ApplicationError):
"""Exception generated when a path based provider cannot be found for a given path."""
def __init__(self, provider_type, path): # type: (t.Type, str) -> None
super(ProviderNotFoundForPath, self).__init__('No %s found for path: %s' % (provider_type.__name__, path))
self.provider_type = provider_type
self.path = path
class PathProvider(ABC):
"""Base class for provider plugins that are path based."""
sequence = 500
priority = 500
def __init__(self, root): # type: (str) -> None
self.root = root
@staticmethod
@abc.abstractmethod
def is_content_root(path): # type: (str) -> bool
"""Return True if the given path is a content root for this provider."""

View file

@ -0,0 +1,183 @@
"""Code for finding content."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import abc
import collections
import os
import lib.types as t
from lib.util import (
ANSIBLE_ROOT,
)
from .. import (
PathProvider,
)
class Layout:
"""Description of content locations and helper methods to access content."""
def __init__(self,
root, # type: str
paths, # type: t.List[str]
): # type: (...) -> None
self.root = root
self.__paths = paths
self.__tree = paths_to_tree(paths)
def all_files(self): # type: () -> t.List[str]
"""Return a list of all file paths."""
return self.__paths
def walk_files(self, directory): # type: (str) -> t.List[str]
"""Return a list of file paths found recursively under the given directory."""
parts = directory.rstrip(os.sep).split(os.sep)
item = get_tree_item(self.__tree, parts)
if not item:
return []
directories = collections.deque(item[0].values())
files = list(item[1])
while directories:
item = directories.pop()
directories.extend(item[0].values())
files.extend(item[1])
return files
def get_dirs(self, directory): # type: (str) -> t.List[str]
"""Return a list directory paths found directly under the given directory."""
parts = directory.rstrip(os.sep).split(os.sep)
item = get_tree_item(self.__tree, parts)
return [os.path.join(directory, key) for key in item[0].keys()] if item else []
def get_files(self, directory): # type: (str) -> t.List[str]
"""Return a list of file paths found directly under the given directory."""
parts = directory.rstrip(os.sep).split(os.sep)
item = get_tree_item(self.__tree, parts)
return item[1] if item else []
class InstallLayout(Layout):
"""Information about the current Ansible install."""
class ContentLayout(Layout):
"""Information about the current Ansible content being tested."""
def __init__(self,
root, # type: str
paths, # type: t.List[str]
plugin_paths, # type: t.Dict[str, str]
provider_paths, # type: t.Dict[str, str]
code_path=None, # type: t.Optional[str]
collection=None, # type: t.Optional[CollectionDetail]
util_path=None, # type: t.Optional[str]
unit_path=None, # type: t.Optional[str]
unit_module_path=None, # type: t.Optional[str]
integration_path=None, # type: t.Optional[str]
): # type: (...) -> None
super(ContentLayout, self).__init__(root, paths)
self.plugin_paths = plugin_paths
self.provider_paths = provider_paths
self.code_path = code_path
self.collection = collection
self.util_path = util_path
self.unit_path = unit_path
self.unit_module_path = unit_module_path
self.integration_path = integration_path
self.is_ansible = root == ANSIBLE_ROOT
@property
def prefix(self): # type: () -> str
"""Return the collection prefix or an empty string if not a collection."""
if self.collection:
return self.collection.prefix
return ''
@property
def module_path(self): # type: () -> t.Optional[str]
"""Return the path where modules are found, if any."""
return self.plugin_paths.get('modules')
@property
def module_utils_path(self): # type: () -> t.Optional[str]
"""Return the path where module_utils are found, if any."""
return self.plugin_paths.get('module_utils')
@property
def module_utils_powershell_path(self): # type: () -> t.Optional[str]
"""Return the path where powershell module_utils are found, if any."""
if self.is_ansible:
return os.path.join(self.plugin_paths['module_utils'], 'powershell')
return self.plugin_paths.get('module_utils')
@property
def module_utils_csharp_path(self): # type: () -> t.Optional[str]
"""Return the path where csharp module_utils are found, if any."""
if self.is_ansible:
return os.path.join(self.plugin_paths['module_utils'], 'csharp')
return self.plugin_paths.get('module_utils')
class CollectionDetail:
"""Details about the layout of the current collection."""
def __init__(self,
name, # type: str
namespace, # type: str
root, # type: str
prefix, # type: str
): # type: (...) -> None
self.name = name
self.namespace = namespace
self.root = root
self.prefix = prefix
self.directory = os.path.join('ansible_collections', namespace, name)
class LayoutProvider(PathProvider):
"""Base class for layout providers."""
@abc.abstractmethod
def create(self, root, paths): # type: (str, t.List[str]) -> ContentLayout
"""Create a layout using the given root and paths."""
def paths_to_tree(paths): # type: (t.List[str]) -> t.Tuple(t.Dict[str, t.Any], t.List[str])
"""Return a filesystem tree from the given list of paths."""
tree = {}, []
for path in paths:
parts = path.split(os.sep)
root = tree
for part in parts[:-1]:
if part not in root[0]:
root[0][part] = {}, []
root = root[0][part]
root[1].append(path)
return tree
def get_tree_item(tree, parts): # type: (t.Tuple(t.Dict[str, t.Any], t.List[str]), t.List[str]) -> t.Optional[t.Tuple(t.Dict[str, t.Any], t.List[str])]
"""Return the portion of the tree found under the path given by parts, or None if it does not exist."""
root = tree
for part in parts:
root = root[0].get(part)
if not root:
return None
return root

View file

@ -0,0 +1,45 @@
"""Layout provider for Ansible source."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import re
import lib.types as t
from . import (
ContentLayout,
LayoutProvider,
)
class AnsibleLayout(LayoutProvider):
"""Layout provider for Ansible source."""
@staticmethod
def is_content_root(path): # type: (str) -> bool
"""Return True if the given path is a content root for this provider."""
return os.path.exists(os.path.join(path, 'setup.py')) and os.path.exists(os.path.join(path, 'bin/ansible-test'))
def create(self, root, paths): # type: (str, t.List[str]) -> ContentLayout
"""Create a Layout using the given root and paths."""
plugin_types = sorted(set(p.split('/')[3] for p in paths if re.search(r'^lib/ansible/plugins/[^/]+/', p)))
provider_types = sorted(set(p.split('/')[4] for p in paths if re.search(r'^test/runner/lib/provider/[^/]+/', p)))
plugin_paths = dict((p, os.path.join('lib/ansible/plugins', p)) for p in plugin_types)
provider_paths = dict((p, os.path.join('test/runner/lib/provider', p)) for p in provider_types)
plugin_paths.update(dict(
modules='lib/ansible/modules',
module_utils='lib/ansible/module_utils',
))
return ContentLayout(root,
paths,
plugin_paths=plugin_paths,
provider_paths=provider_paths,
code_path='lib/ansible',
util_path='test/utils',
unit_path='test/units',
unit_module_path='test/units/modules',
integration_path='test/integration',
)

View file

@ -0,0 +1,60 @@
"""Layout provider for Ansible collections."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import re
import lib.types as t
from . import (
ContentLayout,
LayoutProvider,
CollectionDetail,
)
class CollectionLayout(LayoutProvider):
"""Layout provider for Ansible collections."""
__module_path = 'plugins/modules'
__unit_path = 'test/unit'
@staticmethod
def is_content_root(path): # type: (str) -> bool
"""Return True if the given path is a content root for this provider."""
if os.path.basename(os.path.dirname(os.path.dirname(path))) == 'ansible_collections':
return True
return False
def create(self, root, paths): # type: (str, t.List[str]) -> ContentLayout
"""Create a Layout using the given root and paths."""
plugin_types = sorted(set(p.split('/')[1] for p in paths if re.search(r'^plugins/[^/]+/', p)))
provider_types = sorted(set(p.split('/')[2] for p in paths if re.search(r'^test/provider/[^/]+/', p)))
plugin_paths = dict((p, os.path.join('plugins', p)) for p in plugin_types)
provider_paths = dict((p, os.path.join('test/provider', p)) for p in provider_types)
collection_root = os.path.dirname(os.path.dirname(root))
collection_dir = os.path.relpath(root, collection_root)
collection_namespace, collection_name = collection_dir.split(os.sep)
collection_prefix = '%s.%s.' % (collection_namespace, collection_name)
collection_root = os.path.dirname(collection_root)
return ContentLayout(root,
paths,
plugin_paths=plugin_paths,
provider_paths=provider_paths,
code_path='',
collection=CollectionDetail(
name=collection_name,
namespace=collection_namespace,
root=collection_root,
prefix=collection_prefix,
),
util_path='test/util',
unit_path='test/unit',
unit_module_path='test/units/plugins/modules',
integration_path='test/integration',
)

View file

@ -0,0 +1,18 @@
"""Common code for source providers."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import abc
import lib.types as t
from .. import (
PathProvider,
)
class SourceProvider(PathProvider):
"""Base class for source providers."""
@abc.abstractmethod
def get_paths(self, path): # type: (str) -> t.List[str]
"""Return the list of available content paths under the given path."""

View file

@ -0,0 +1,31 @@
"""Source provider for a content root managed by git version control."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import lib.types as t
from lib.git import (
Git,
)
from . import (
SourceProvider,
)
class GitSource(SourceProvider):
"""Source provider for a content root managed by git version control."""
@staticmethod
def is_content_root(path): # type: (str) -> bool
"""Return True if the given path is a content root for this provider."""
return os.path.exists(os.path.join(path, '.git'))
def get_paths(self, path): # type: (str) -> t.List[str]
"""Return the list of available content paths under the given path."""
git = Git(path)
paths = git.get_file_names(['--cached', '--others', '--exclude-standard'])
return paths

View file

@ -0,0 +1,77 @@
"""Fallback source provider when no other provider matches the content root."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import lib.types as t
from lib.constants import (
TIMEOUT_PATH,
)
from . import (
SourceProvider,
)
class UnversionedSource(SourceProvider):
"""Fallback source provider when no other provider matches the content root."""
sequence = 0 # disable automatic detection
@staticmethod
def is_content_root(path): # type: (str) -> bool
"""Return True if the given path is a content root for this provider."""
return False
def get_paths(self, path): # type: (str) -> t.List[str]
"""Return the list of available content paths under the given path."""
paths = []
kill_any_dir = (
'.idea',
'.pytest_cache',
'__pycache__',
'ansible.egg-info',
)
kill_sub_dir = {
'test/runner': (
'.tox',
),
'test': (
'results',
'cache',
),
'docs/docsite': (
'_build',
),
}
kill_sub_file = {
'': (
TIMEOUT_PATH,
),
}
kill_extensions = (
'.pyc',
'.retry',
)
for root, dir_names, file_names in os.walk(path):
rel_root = os.path.relpath(root, path)
if rel_root == '.':
rel_root = ''
for kill in kill_any_dir + kill_sub_dir.get(rel_root, ()):
if kill in dir_names:
dir_names.remove(kill)
kill_files = kill_sub_file.get(rel_root, ())
paths.extend([os.path.join(rel_root, file_name) for file_name in file_names
if not os.path.splitext(file_name)[1] in kill_extensions and file_name not in kill_files])
return paths

View file

@ -1,109 +0,0 @@
"""Python native TGZ creation."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import abc
import tarfile
import os
from lib.util import (
display,
ABC,
)
from lib.constants import (
TIMEOUT_PATH,
)
# improve performance by disabling uid/gid lookups
tarfile.pwd = None
tarfile.grp = None
class TarFilter(ABC):
"""Filter to use when creating a tar file."""
@abc.abstractmethod
def ignore(self, item):
"""
:type item: tarfile.TarInfo
:rtype: tarfile.TarInfo | None
"""
class DefaultTarFilter(TarFilter):
"""
To reduce archive time and size, ignore non-versioned files which are large or numerous.
Also ignore miscellaneous git related files since the .git directory is ignored.
"""
def __init__(self):
self.ignore_dirs = (
'.tox',
'.git',
'.idea',
'.pytest_cache',
'__pycache__',
'ansible.egg-info',
)
self.ignore_files = (
'.gitignore',
'.gitdir',
TIMEOUT_PATH,
)
self.ignore_extensions = (
'.pyc',
'.retry',
)
def ignore(self, item):
"""
:type item: tarfile.TarInfo
:rtype: tarfile.TarInfo | None
"""
filename = os.path.basename(item.path)
ext = os.path.splitext(filename)[1]
dirs = os.path.split(item.path)
if not item.isdir():
if item.path.startswith('./test/results/'):
return None
if item.path.startswith('./docs/docsite/_build/'):
return None
if filename in self.ignore_files:
return None
if ext in self.ignore_extensions:
return None
if any(d in self.ignore_dirs for d in dirs):
return None
return item
class AllowGitTarFilter(DefaultTarFilter):
"""
Filter that allows git related files normally excluded by the default tar filter.
"""
def __init__(self):
super(AllowGitTarFilter, self).__init__()
self.ignore_dirs = tuple(d for d in self.ignore_dirs if not d.startswith('.git'))
self.ignore_files = tuple(f for f in self.ignore_files if not f.startswith('.git'))
def create_tarfile(dst_path, src_path, tar_filter):
"""
:type dst_path: str
:type src_path: str
:type tar_filter: TarFilter
"""
display.info('Creating a compressed tar archive of path: %s' % src_path, verbosity=1)
with tarfile.TarFile.gzopen(dst_path, mode='w', compresslevel=4) as tar:
tar.add(src_path, filter=tar_filter.ignore)
display.info('Resulting archive is %d bytes.' % os.path.getsize(dst_path), verbosity=1)

View file

@ -7,7 +7,6 @@ import glob
import json
import os
import re
import sys
import lib.types as t
@ -19,7 +18,7 @@ from lib.util import (
load_plugins,
parse_to_list_of_dict,
ABC,
INSTALL_ROOT,
ANSIBLE_ROOT,
is_binary_file,
read_lines_without_comments,
)
@ -57,6 +56,10 @@ from lib.test import (
TestMessage,
)
from lib.data import (
data_context,
)
COMMAND = 'sanity'
@ -145,9 +148,14 @@ def collect_code_smell_tests():
:rtype: tuple[SanityFunc]
"""
skip_file = 'test/sanity/code-smell/skip.txt'
ansible_only_file = os.path.join(ANSIBLE_ROOT, 'test/sanity/code-smell/ansible-only.txt')
skip_tests = read_lines_without_comments(skip_file, remove_blank_lines=True, optional=True)
paths = glob.glob(os.path.join(INSTALL_ROOT, 'test/sanity/code-smell/*'))
if not data_context().content.is_ansible:
skip_tests += read_lines_without_comments(ansible_only_file, remove_blank_lines=True)
paths = glob.glob(os.path.join(ANSIBLE_ROOT, 'test/sanity/code-smell/*'))
paths = sorted(p for p in paths if os.access(p, os.X_OK) and os.path.isfile(p) and os.path.basename(p) not in skip_tests)
tests = tuple(SanityCodeSmellTest(p) for p in paths)
@ -215,6 +223,8 @@ class SanityTest(ABC):
"""Sanity test base class."""
__metaclass__ = abc.ABCMeta
ansible_only = False
def __init__(self, name):
self.name = name
self.enabled = True
@ -288,10 +298,6 @@ class SanityCodeSmellTest(SanityTest):
if always:
paths = []
# short-term work-around for paths being str instead of unicode on python 2.x
if sys.version_info[0] == 2:
paths = [p.decode('utf-8') for p in paths]
if text is not None:
if text:
paths = [p for p in paths if not is_binary_file(p)]
@ -385,6 +391,6 @@ def sanity_init():
import_plugins('sanity')
sanity_plugins = {} # type: t.Dict[str, t.Type[SanityFunc]]
load_plugins(SanityFunc, sanity_plugins)
sanity_tests = tuple([plugin() for plugin in sanity_plugins.values()])
sanity_tests = tuple([plugin() for plugin in sanity_plugins.values() if data_context().content.is_ansible or not plugin.ansible_only])
global SANITY_TESTS # pylint: disable=locally-disabled, global-statement
SANITY_TESTS = tuple(sorted(sanity_tests + collect_code_smell_tests(), key=lambda k: k.name))

View file

@ -32,6 +32,14 @@ from lib.config import (
SanityConfig,
)
from lib.data import (
data_context,
)
from lib.coverage_util import (
coverage_context,
)
class AnsibleDocTest(SanityMultipleVersion):
"""Sanity test for ansible-doc."""
@ -69,19 +77,19 @@ class AnsibleDocTest(SanityMultipleVersion):
target_paths = collections.defaultdict(dict)
for module in modules:
doc_targets['module'].append(module)
doc_targets['module'].append(data_context().content.prefix + module)
for plugin_type, plugin_name, plugin_path in plugins:
if plugin_type in plugin_type_blacklist:
continue
doc_targets[plugin_type].append(plugin_name)
target_paths[plugin_type][plugin_name] = plugin_path
doc_targets[plugin_type].append(data_context().content.prefix + plugin_name)
target_paths[plugin_type][data_context().content.prefix + plugin_name] = plugin_path
if not doc_targets:
return SanitySkipped(self.name, python_version=python_version)
target_paths['module'] = dict((t.module, t.path) for t in targets.targets if t.module)
target_paths['module'] = dict((data_context().content.prefix + t.module, t.path) for t in targets.targets if t.module)
env = ansible_environment(args, color=False)
error_messages = []
@ -90,7 +98,9 @@ class AnsibleDocTest(SanityMultipleVersion):
cmd = ['ansible-doc', '-t', doc_type] + sorted(doc_targets[doc_type])
try:
with coverage_context(args):
stdout, stderr = intercept_command(args, cmd, target_name='ansible-doc', env=env, capture=True, python_version=python_version)
status = 0
except SubprocessError as ex:
stdout = ex.stdout

View file

@ -18,7 +18,7 @@ from lib.util import (
find_python,
read_lines_without_comments,
parse_to_list_of_dict,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.util_common import (
@ -55,7 +55,7 @@ class CompileTest(SanityMultipleVersion):
if not paths:
return SanitySkipped(self.name, python_version=python_version)
cmd = [find_python(python_version), os.path.join(INSTALL_ROOT, 'test/sanity/compile/compile.py')]
cmd = [find_python(python_version), os.path.join(ANSIBLE_ROOT, 'test/sanity/compile/compile.py')]
data = '\n'.join(paths)

View file

@ -26,7 +26,6 @@ from lib.util import (
from lib.util_common import (
intercept_command,
run_command,
INSTALL_ROOT,
)
from lib.ansible_util import (
@ -41,6 +40,15 @@ from lib.config import (
SanityConfig,
)
from lib.coverage_util import (
coverage_context,
)
from lib.data import (
data_context,
ANSIBLE_ROOT,
)
class ImportTest(SanityMultipleVersion):
"""Sanity test for proper import exception handling."""
@ -60,7 +68,7 @@ class ImportTest(SanityMultipleVersion):
i.path
for i in targets.include
if os.path.splitext(i.path)[1] == '.py' and
(is_subdir(i.path, 'lib/ansible/modules/') or is_subdir(i.path, 'lib/ansible/module_utils/')) and
(is_subdir(i.path, data_context().content.module_path) or is_subdir(i.path, data_context().content.module_utils_path)) and
i.path not in skip_paths_set
)
@ -87,7 +95,7 @@ class ImportTest(SanityMultipleVersion):
# add the importer to our virtual environment so it can be accessed through the coverage injector
importer_path = os.path.join(virtual_environment_bin, 'importer.py')
if not args.explain:
os.symlink(os.path.abspath(os.path.join(INSTALL_ROOT, 'test/sanity/import/importer.py')), importer_path)
os.symlink(os.path.abspath(os.path.join(ANSIBLE_ROOT, 'test/sanity/import/importer.py')), importer_path)
# create a minimal python library
python_path = os.path.abspath('test/runner/.tox/import/lib')
@ -96,13 +104,14 @@ class ImportTest(SanityMultipleVersion):
ansible_link = os.path.join(ansible_path, 'module_utils')
if not args.explain:
remove_tree(ansible_path)
make_dirs(ansible_path)
with open(ansible_init, 'w'):
pass
if not os.path.exists(ansible_link):
os.symlink('../../../../../../lib/ansible/module_utils', ansible_link)
os.symlink(os.path.join(ANSIBLE_ROOT, 'lib/ansible/module_utils'), ansible_link)
# activate the virtual environment
env['PATH'] = '%s:%s' % (virtual_environment_bin, env['PATH'])
@ -126,7 +135,9 @@ class ImportTest(SanityMultipleVersion):
virtualenv_python = os.path.join(virtual_environment_bin, 'python')
try:
stdout, stderr = intercept_command(args, cmd, self.name, env, capture=True, data=data, python_version=python_version, virtualenv=virtualenv_python)
with coverage_context(args):
stdout, stderr = intercept_command(args, cmd, self.name, env, capture=True, data=data, python_version=python_version,
virtualenv=virtualenv_python)
if stdout or stderr:
raise SubprocessError(cmd, stdout=stdout, stderr=stderr)

View file

@ -80,6 +80,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
Consider adding integration tests before or alongside changes.
"""
ansible_only = True
def __init__(self):
super(IntegrationAliasesTest, self).__init__()

View file

@ -17,7 +17,7 @@ from lib.util import (
display,
read_lines_without_comments,
parse_to_list_of_dict,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.util_common import (
@ -47,10 +47,10 @@ class Pep8Test(SanitySingleVersion):
skip_paths = read_lines_without_comments(PEP8_SKIP_PATH, optional=True)
legacy_paths = read_lines_without_comments(PEP8_LEGACY_PATH, optional=True)
legacy_ignore_file = os.path.join(INSTALL_ROOT, 'test/sanity/pep8/legacy-ignore.txt')
legacy_ignore_file = os.path.join(ANSIBLE_ROOT, 'test/sanity/pep8/legacy-ignore.txt')
legacy_ignore = set(read_lines_without_comments(legacy_ignore_file, remove_blank_lines=True))
current_ignore_file = os.path.join(INSTALL_ROOT, 'test/sanity/pep8/current-ignore.txt')
current_ignore_file = os.path.join(ANSIBLE_ROOT, 'test/sanity/pep8/current-ignore.txt')
current_ignore = sorted(read_lines_without_comments(current_ignore_file, remove_blank_lines=True))
skip_paths_set = set(skip_paths)

View file

@ -25,7 +25,7 @@ from lib.util import (
from lib.util_common import (
run_command,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.config import (
@ -37,6 +37,10 @@ from lib.test import (
calculate_best_confidence,
)
from lib.data import (
data_context,
)
PSLINT_SKIP_PATH = 'test/sanity/pslint/skip.txt'
PSLINT_IGNORE_PATH = 'test/sanity/pslint/ignore.txt'
@ -85,8 +89,8 @@ class PslintTest(SanitySingleVersion):
# Make sure requirements are installed before running sanity checks
cmds = [
[os.path.join(INSTALL_ROOT, 'test/runner/requirements/sanity.ps1')],
[os.path.join(INSTALL_ROOT, 'test/sanity/pslint/pslint.ps1')] + paths
[os.path.join(ANSIBLE_ROOT, 'test/runner/requirements/sanity.ps1')],
[os.path.join(ANSIBLE_ROOT, 'test/sanity/pslint/pslint.ps1')] + paths
]
stdout = ''
@ -113,7 +117,7 @@ class PslintTest(SanitySingleVersion):
'ParseError',
]
cwd = os.getcwd() + '/'
cwd = data_context().content.root + '/'
# replace unicode smart quotes and ellipsis with ascii versions
stdout = re.sub(u'[\u2018\u2019]', "'", stdout)

View file

@ -23,7 +23,7 @@ from lib.util import (
display,
read_lines_without_comments,
ConfigParser,
INSTALL_ROOT,
ANSIBLE_ROOT,
is_subdir,
)
@ -48,6 +48,11 @@ from lib.test import (
calculate_best_confidence,
)
from lib.data import (
data_context,
)
PYLINT_SKIP_PATH = 'test/sanity/pylint/skip.txt'
PYLINT_IGNORE_PATH = 'test/sanity/pylint/ignore.txt'
@ -69,7 +74,7 @@ class PylintTest(SanitySingleVersion):
display.warning('Skipping pylint on unsupported Python version %s.' % args.python_version)
return SanitySkipped(self.name)
plugin_dir = os.path.join(INSTALL_ROOT, 'test/sanity/pylint/plugins')
plugin_dir = os.path.join(ANSIBLE_ROOT, 'test/sanity/pylint/plugins')
plugin_names = sorted(p[0] for p in [
os.path.splitext(p) for p in os.listdir(plugin_dir)] if p[1] == '.py' and p[0] != '__init__')
@ -116,15 +121,16 @@ class PylintTest(SanitySingleVersion):
paths = sorted(i.path for i in targets.include if (os.path.splitext(i.path)[1] == '.py' or is_subdir(i.path, 'bin/')) and i.path not in skip_paths_set)
module_paths = [os.path.relpath(p, 'lib/ansible/modules/').split(os.path.sep) for p in paths if is_subdir(p, 'lib/ansible/modules/')]
module_paths = [os.path.relpath(p, data_context().content.module_path).split(os.path.sep) for p in
paths if is_subdir(p, data_context().content.module_path)]
module_dirs = sorted(set([p[0] for p in module_paths if len(p) > 1]))
large_module_group_threshold = 500
large_module_groups = [key for key, value in
itertools.groupby(module_paths, lambda p: p[0] if len(p) > 1 else '') if len(list(value)) > large_module_group_threshold]
large_module_group_paths = [os.path.relpath(p, 'lib/ansible/modules/').split(os.path.sep) for p in paths
if any(is_subdir(p, os.path.join('lib/ansible/modules/', g)) for g in large_module_groups)]
large_module_group_paths = [os.path.relpath(p, data_context().content.module_path).split(os.path.sep) for p in paths
if any(is_subdir(p, os.path.join(data_context().content.module_path, g)) for g in large_module_groups)]
large_module_group_dirs = sorted(set([os.path.sep.join(p[:2]) for p in large_module_group_paths if len(p) > 2]))
contexts = []
@ -154,17 +160,20 @@ class PylintTest(SanitySingleVersion):
return context_filter
for large_module_group_dir in large_module_group_dirs:
add_context(remaining_paths, 'modules/%s' % large_module_group_dir, filter_path('lib/ansible/modules/%s/' % large_module_group_dir))
for large_module_dir in large_module_group_dirs:
add_context(remaining_paths, 'modules/%s' % large_module_dir, filter_path(os.path.join(data_context().content.module_path, large_module_dir)))
for module_dir in module_dirs:
add_context(remaining_paths, 'modules/%s' % module_dir, filter_path('lib/ansible/modules/%s/' % module_dir))
add_context(remaining_paths, 'modules/%s' % module_dir, filter_path(os.path.join(data_context().content.module_path, module_dir)))
add_context(remaining_paths, 'modules', filter_path('lib/ansible/modules/'))
add_context(remaining_paths, 'module_utils', filter_path('lib/ansible/module_utils/'))
add_context(remaining_paths, 'modules', filter_path(data_context().content.module_path))
add_context(remaining_paths, 'module_utils', filter_path(data_context().content.module_utils_path))
add_context(remaining_paths, 'units', filter_path('test/units/'))
add_context(remaining_paths, 'units', filter_path(data_context().content.unit_path))
if data_context().content.collection:
add_context(remaining_paths, 'collection', lambda p: True)
else:
add_context(remaining_paths, 'validate-modules', filter_path('test/sanity/validate-modules/'))
add_context(remaining_paths, 'sanity', filter_path('test/sanity/'))
add_context(remaining_paths, 'ansible-test', filter_path('test/runner/'))
@ -272,10 +281,10 @@ class PylintTest(SanitySingleVersion):
@staticmethod
def pylint(args, context, paths, plugin_dir, plugin_names): # type: (SanityConfig, str, t.List[str], str, t.List[str]) -> t.List[t.Dict[str, str]]
"""Run pylint using the config specified by the context on the specified paths."""
rcfile = os.path.join(INSTALL_ROOT, 'test/sanity/pylint/config/%s' % context.split('/')[0])
rcfile = os.path.join(ANSIBLE_ROOT, 'test/sanity/pylint/config/%s' % context.split('/')[0])
if not os.path.exists(rcfile):
rcfile = os.path.join(INSTALL_ROOT, 'test/sanity/pylint/config/default')
rcfile = os.path.join(ANSIBLE_ROOT, 'test/sanity/pylint/config/default')
parser = ConfigParser()
parser.read(rcfile)
@ -301,6 +310,9 @@ class PylintTest(SanitySingleVersion):
append_python_path = [plugin_dir]
if data_context().content.collection:
append_python_path.append(data_context().content.collection.root)
env = ansible_environment(args)
env['PYTHONPATH'] += os.path.pathsep + os.path.pathsep.join(append_python_path)

View file

@ -17,7 +17,7 @@ from lib.util import (
parse_to_list_of_dict,
display,
read_lines_without_comments,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.util_common import (
@ -45,7 +45,7 @@ class RstcheckTest(SanitySingleVersion):
display.warning('Skipping rstcheck on unsupported Python version %s.' % args.python_version)
return SanitySkipped(self.name)
ignore_file = os.path.join(INSTALL_ROOT, 'test/sanity/rstcheck/ignore-substitutions.txt')
ignore_file = os.path.join(ANSIBLE_ROOT, 'test/sanity/rstcheck/ignore-substitutions.txt')
ignore_substitutions = sorted(set(read_lines_without_comments(ignore_file, remove_blank_lines=True)))
paths = sorted(i.path for i in targets.include if os.path.splitext(i.path)[1] in ('.rst',))

View file

@ -4,10 +4,6 @@ __metaclass__ = type
import os
from lib.util import (
INSTALL_ROOT,
)
from lib.sanity import (
SanitySingleVersion,
SanityMessage,
@ -20,9 +16,15 @@ from lib.config import (
SanityConfig,
)
from lib.data import (
data_context,
)
class SanityDocsTest(SanitySingleVersion):
"""Sanity test for documentation of sanity tests."""
ansible_only = True
# noinspection PyUnusedLocal
def test(self, args, targets): # pylint: disable=locally-disabled, unused-argument
"""
@ -30,8 +32,9 @@ class SanityDocsTest(SanitySingleVersion):
:type targets: SanityTargets
:rtype: TestResult
"""
sanity_dir = os.path.join(INSTALL_ROOT, 'docs/docsite/rst/dev_guide/testing/sanity')
sanity_docs = set(part[0] for part in (os.path.splitext(name) for name in os.listdir(sanity_dir)) if part[1] == '.rst')
sanity_dir = 'docs/docsite/rst/dev_guide/testing/sanity'
sanity_docs = set(part[0] for part in (os.path.splitext(os.path.basename(path)) for path in data_context().content.get_files(sanity_dir))
if part[1] == '.rst')
sanity_tests = set(sanity_test.name for sanity_test in sanity_get_tests())
missing = sanity_tests - sanity_docs

View file

@ -20,7 +20,7 @@ from lib.util import (
SubprocessError,
display,
read_lines_without_comments,
INSTALL_ROOT,
ANSIBLE_ROOT,
)
from lib.util_common import (
@ -40,6 +40,10 @@ from lib.test import (
calculate_best_confidence,
)
from lib.data import (
data_context,
)
VALIDATE_SKIP_PATH = 'test/sanity/validate-modules/skip.txt'
VALIDATE_IGNORE_PATH = 'test/sanity/validate-modules/ignore.txt'
@ -61,6 +65,13 @@ class ValidateModulesTest(SanitySingleVersion):
display.warning('Skipping validate-modules on unsupported Python version %s.' % args.python_version)
return SanitySkipped(self.name)
if data_context().content.is_ansible:
ignore_codes = ()
else:
ignore_codes = ((
'E502', # only ansible content requires __init__.py for module subdirectories
))
skip_paths = read_lines_without_comments(VALIDATE_SKIP_PATH, optional=True)
skip_paths_set = set(skip_paths)
@ -73,7 +84,7 @@ class ValidateModulesTest(SanitySingleVersion):
cmd = [
args.python_executable,
os.path.join(INSTALL_ROOT, 'test/sanity/validate-modules/validate-modules'),
os.path.join(ANSIBLE_ROOT, 'test/sanity/validate-modules/validate-modules'),
'--format', 'json',
'--arg-spec',
] + paths
@ -138,6 +149,8 @@ class ValidateModulesTest(SanitySingleVersion):
filtered = []
errors = [error for error in errors if error.code not in ignore_codes]
for error in errors:
if error.code in ignore[error.path]:
ignore[error.path][error.code] = 0 # error ignored, clear line number of ignore entry to track usage

View file

@ -16,7 +16,8 @@ from lib.sanity import (
from lib.util import (
SubprocessError,
display,
INSTALL_ROOT,
ANSIBLE_ROOT,
is_subdir,
)
from lib.util_common import (
@ -27,6 +28,10 @@ from lib.config import (
SanityConfig,
)
from lib.data import (
data_context,
)
class YamllintTest(SanitySingleVersion):
"""Sanity test using yamllint."""
@ -38,20 +43,17 @@ class YamllintTest(SanitySingleVersion):
"""
paths = [
[i.path for i in targets.include if os.path.splitext(i.path)[1] in ('.yml', '.yaml')],
[i.path for i in targets.include if os.path.splitext(i.path)[1] == '.py' and
os.path.basename(i.path) != '__init__.py' and
i.path.startswith('lib/ansible/plugins/')],
[i.path for i in targets.include if os.path.splitext(i.path)[1] == '.py' and
os.path.basename(i.path) != '__init__.py' and
i.path.startswith('lib/ansible/modules/')],
[i.path for i in targets.include if os.path.splitext(i.path)[1] == '.py' and
os.path.basename(i.path) != '__init__.py' and
i.path.startswith('lib/ansible/plugins/doc_fragments/')],
]
for plugin_type, plugin_path in sorted(data_context().content.plugin_paths.items()):
if plugin_type == 'module_utils':
continue
paths.append([target.path for target in targets.include if
os.path.splitext(target.path)[1] == '.py' and
os.path.basename(target.path) != '__init__.py' and
is_subdir(target.path, plugin_path)])
paths = [sorted(p) for p in paths if p]
if not paths:
@ -76,7 +78,7 @@ class YamllintTest(SanitySingleVersion):
"""
cmd = [
args.python_executable,
os.path.join(INSTALL_ROOT, 'test/sanity/yamllint/yamllinter.py'),
os.path.join(ANSIBLE_ROOT, 'test/sanity/yamllint/yamllinter.py'),
]
data = '\n'.join(paths)

View file

@ -8,13 +8,17 @@ import re
import errno
import itertools
import abc
import sys
from lib.util import (
ApplicationError,
display,
read_lines_without_comments,
is_subdir,
to_text,
)
from lib.data import (
data_context,
)
MODULE_EXTENSIONS = '.py', '.ps1'
@ -28,8 +32,6 @@ def find_target_completion(target_func, prefix):
"""
try:
targets = target_func()
if sys.version_info[0] == 2:
prefix = prefix.encode()
short = os.environ.get('COMP_TYPE') == '63' # double tab completion from bash
matches = walk_completion_targets(targets, prefix, short)
return matches
@ -145,7 +147,7 @@ def walk_module_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
for target in walk_test_targets(path='lib/ansible/modules', module_path='lib/ansible/modules/', extensions=MODULE_EXTENSIONS):
for target in walk_test_targets(path=data_context().content.module_path, module_path=data_context().content.module_path, extensions=MODULE_EXTENSIONS):
if not target.module:
continue
@ -156,21 +158,21 @@ def walk_units_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
return walk_test_targets(path='test/units', module_path='test/units/modules/', extensions=('.py',), prefix='test_')
return walk_test_targets(path=data_context().content.unit_path, module_path=data_context().content.unit_module_path, extensions=('.py',), prefix='test_')
def walk_compile_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
return walk_test_targets(module_path='lib/ansible/modules/', extensions=('.py',), extra_dirs=('bin',))
return walk_test_targets(module_path=data_context().content.module_path, extensions=('.py',), extra_dirs=('bin',))
def walk_sanity_targets():
"""
:rtype: collections.Iterable[TestTarget]
"""
return walk_test_targets(module_path='lib/ansible/modules/')
return walk_test_targets(module_path=data_context().content.module_path)
def walk_posix_integration_targets(include_hidden=False):
@ -209,7 +211,7 @@ def walk_integration_targets():
"""
path = 'test/integration/targets'
modules = frozenset(target.module for target in walk_module_targets())
paths = sorted(path for path in [os.path.join(path, p) for p in os.listdir(path)] if os.path.isdir(path))
paths = data_context().content.get_dirs(path)
prefixes = load_integration_prefixes()
for path in paths:
@ -221,7 +223,7 @@ def load_integration_prefixes():
:rtype: dict[str, str]
"""
path = 'test/integration'
file_paths = sorted(os.path.join(path, f) for f in os.listdir(path) if os.path.splitext(f)[0] == 'target-prefixes')
file_paths = sorted(f for f in data_context().content.get_files(path) if os.path.splitext(os.path.basename(f))[0] == 'target-prefixes')
prefixes = {}
for file_path in file_paths:
@ -241,26 +243,10 @@ def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None,
:type extra_dirs: tuple[str] | None
:rtype: collections.Iterable[TestTarget]
"""
file_paths = []
for root, _dir_names, file_names in os.walk(path or '.', topdown=False):
if root.endswith('/__pycache__'):
continue
if '/.tox/' in root:
continue
if path is None:
root = root[2:]
if root.startswith('.') and root != '.github':
continue
for file_name in file_names:
if file_name.startswith('.'):
continue
file_paths.append(os.path.join(root, file_name))
if path:
file_paths = data_context().content.walk_files(path)
else:
file_paths = data_context().content.all_files()
for file_path in file_paths:
name, ext = os.path.splitext(os.path.basename(file_path))
@ -282,12 +268,7 @@ def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None,
if extra_dirs:
for extra_dir in extra_dirs:
file_names = os.listdir(extra_dir)
for file_name in file_names:
file_path = os.path.join(extra_dir, file_name)
if os.path.isfile(file_path):
for file_path in data_context().content.get_files(extra_dir):
file_paths.append(file_path)
for file_path in file_paths:
@ -322,13 +303,7 @@ def analyze_integration_target_dependencies(integration_targets):
# handle symlink dependencies between targets
# this use case is supported, but discouraged
for target in integration_targets:
paths = []
for root, _dummy, file_names in os.walk(target.path):
for name in file_names:
paths.append(os.path.join(root, name))
for path in paths:
for path in data_context().content.walk_files(target.path):
if not os.path.islink(path):
continue
@ -352,14 +327,14 @@ def analyze_integration_target_dependencies(integration_targets):
if not os.path.isdir(meta_dir):
continue
meta_paths = sorted([os.path.join(meta_dir, name) for name in os.listdir(meta_dir)])
meta_paths = data_context().content.get_files(meta_dir)
for meta_path in meta_paths:
if os.path.exists(meta_path):
with open(meta_path, 'rb') as meta_fd:
# try and decode the file as a utf-8 string, skip if it contains invalid chars (binary file)
try:
meta_lines = meta_fd.read().decode('utf-8').splitlines()
meta_lines = to_text(meta_fd.read()).splitlines()
except UnicodeDecodeError:
continue
@ -517,7 +492,7 @@ class IntegrationTarget(CompletionTarget):
# script_path and type
contents = sorted(os.listdir(path))
contents = [os.path.basename(p) for p in data_context().content.get_files(path)]
runme_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'runme')
test_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'test')

View file

@ -11,6 +11,7 @@ import lib.types as t
from lib.util import (
display,
make_dirs,
to_bytes,
)
from lib.config import (
@ -155,7 +156,7 @@ class TestResult:
return
with open(path, 'wb') as xml:
xml.write(report.encode('utf-8', 'strict'))
xml.write(to_bytes(report))
class TestTimeout(TestResult):

View file

@ -5,6 +5,7 @@ __metaclass__ = type
try:
from typing import (
Any,
AnyStr,
Callable,
Dict,
FrozenSet,
@ -12,6 +13,7 @@ try:
List,
Optional,
Set,
Text,
Tuple,
Type,
TypeVar,

View file

@ -62,7 +62,7 @@ except AttributeError:
COVERAGE_CONFIG_PATH = '.coveragerc'
COVERAGE_OUTPUT_PATH = 'coverage'
INSTALL_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
ANSIBLE_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Modes are set to allow all users the same level of access.
# This permits files to be used in tests that change users.
@ -78,6 +78,42 @@ MODE_FILE_WRITE = MODE_FILE | stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
MODE_DIRECTORY = MODE_READ | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
MODE_DIRECTORY_WRITE = MODE_DIRECTORY | stat.S_IWGRP | stat.S_IWOTH
ENCODING = 'utf-8'
Text = type(u'')
def to_optional_bytes(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[bytes]
"""Return the given value as bytes encoded using UTF-8 if not already bytes, or None if the value is None."""
return None if value is None else to_bytes(value, errors)
def to_optional_text(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[t.Text]
"""Return the given value as text decoded using UTF-8 if not already text, or None if the value is None."""
return None if value is None else to_text(value, errors)
def to_bytes(value, errors='strict'): # type: (t.AnyStr, str) -> bytes
"""Return the given value as bytes encoded using UTF-8 if not already bytes."""
if isinstance(value, bytes):
return value
if isinstance(value, Text):
return value.encode(ENCODING, errors)
raise Exception('value is not bytes or text: %s' % type(value))
def to_text(value, errors='strict'): # type: (t.AnyStr, str) -> t.Text
"""Return the given value as text decoded using UTF-8 if not already text."""
if isinstance(value, bytes):
return value.decode(ENCODING, errors)
if isinstance(value, Text):
return value
raise Exception('value is not bytes or text: %s' % type(value))
def get_docker_completion():
"""
@ -100,7 +136,7 @@ def get_parameterized_completion(cache, name):
:rtype: dict[str, dict[str, str]]
"""
if not cache:
images = read_lines_without_comments(os.path.join(INSTALL_ROOT, 'test/runner/completion/%s.txt' % name), remove_blank_lines=True)
images = read_lines_without_comments(os.path.join(ANSIBLE_ROOT, 'test/runner/completion/%s.txt' % name), remove_blank_lines=True)
cache.update(dict(kvp for kvp in [parse_parameterized_completion(i) for i in images] if kvp))
@ -297,21 +333,19 @@ def raw_command(cmd, capture=False, env=None, data=None, cwd=None, explain=False
try:
try:
process = subprocess.Popen(cmd, env=env, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd)
cmd_bytes = [to_bytes(c) for c in cmd]
env_bytes = dict((to_bytes(k), to_bytes(v)) for k, v in env.items())
process = subprocess.Popen(cmd_bytes, env=env_bytes, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd)
except OSError as ex:
if ex.errno == errno.ENOENT:
raise ApplicationError('Required program "%s" not found.' % cmd[0])
raise
if communicate:
encoding = 'utf-8'
if data is None or isinstance(data, bytes):
data_bytes = data
else:
data_bytes = data.encode(encoding, 'surrogateescape')
data_bytes = to_optional_bytes(data)
stdout_bytes, stderr_bytes = process.communicate(data_bytes)
stdout_text = stdout_bytes.decode(encoding, str_errors) if stdout_bytes else u''
stderr_text = stderr_bytes.decode(encoding, str_errors) if stderr_bytes else u''
stdout_text = to_optional_text(stdout_bytes, str_errors) or u''
stderr_text = to_optional_text(stderr_bytes, str_errors) or u''
else:
process.wait()
stdout_text, stderr_text = None, None
@ -418,7 +452,7 @@ def remove_tree(path):
:type path: str
"""
try:
shutil.rmtree(path)
shutil.rmtree(to_bytes(path))
except OSError as ex:
if ex.errno != errno.ENOENT:
raise
@ -429,7 +463,7 @@ def make_dirs(path):
:type path: str
"""
try:
os.makedirs(path)
os.makedirs(to_bytes(path))
except OSError as ex:
if ex.errno != errno.EEXIST:
raise
@ -532,7 +566,7 @@ class Display:
def __init__(self):
self.verbosity = 0
self.color = True
self.color = sys.stdout.isatty()
self.warnings = []
self.warnings_unique = set()
self.info_stderr = False
@ -617,8 +651,8 @@ class Display:
message = message.replace(self.clear, color)
message = '%s%s%s' % (color, message, self.clear)
if sys.version_info[0] == 2 and isinstance(message, type(u'')):
message = message.encode('utf-8')
if sys.version_info[0] == 2:
message = to_bytes(message)
print(message, file=fd)
fd.flush()

View file

@ -15,12 +15,13 @@ from lib.util import (
COVERAGE_OUTPUT_PATH,
display,
find_python,
INSTALL_ROOT,
ANSIBLE_ROOT,
is_shippable,
MODE_DIRECTORY,
MODE_FILE_EXECUTE,
PYTHON_PATHS,
raw_command,
to_bytes,
)
@ -56,14 +57,11 @@ def named_temporary_file(args, prefix, suffix, directory, content):
:param content: str | bytes | unicode
:rtype: str
"""
if not isinstance(content, bytes):
content = content.encode('utf-8')
if args.explain:
yield os.path.join(directory, '%stemp%s' % (prefix, suffix))
else:
with tempfile.NamedTemporaryFile(prefix=prefix, suffix=suffix, dir=directory) as tempfile_fd:
tempfile_fd.write(content)
tempfile_fd.write(to_bytes(content))
tempfile_fd.flush()
yield tempfile_fd.name
@ -159,7 +157,7 @@ def get_coverage_environment(args, target_name, version, temp_path, module_cover
else:
# unit tests, sanity tests and other special cases (localhost only)
# config and results are in the source tree
coverage_config_base_path = args.coverage_config_base_path or INSTALL_ROOT
coverage_config_base_path = args.coverage_config_base_path or ANSIBLE_ROOT
coverage_output_base_path = os.path.abspath(os.path.join('test/results'))
config_file = os.path.join(coverage_config_base_path, COVERAGE_CONFIG_PATH)
@ -212,7 +210,7 @@ def intercept_command(args, cmd, target_name, env, capture=False, data=None, cwd
cmd = list(cmd)
version = python_version or args.python_version
interpreter = virtualenv or find_python(version)
inject_path = os.path.join(INSTALL_ROOT, 'test/runner/injector')
inject_path = os.path.join(ANSIBLE_ROOT, 'test/runner/injector')
if not virtualenv:
# injection of python into the path is required when not activating a virtualenv

View file

@ -0,0 +1,8 @@
azure-requirements.py
botmeta.py
changelog.py
configure-remoting-ps1.py
deprecated-config.py
docs-build.py
test-constraints.py
update-bundled.py

View file

@ -0,0 +1,133 @@
[MESSAGES CONTROL]
disable=
abstract-method,
access-member-before-definition,
ansible-deprecated-version,
arguments-differ,
assignment-from-no-return,
assignment-from-none,
attribute-defined-outside-init,
bad-continuation,
bad-indentation,
bad-mcs-classmethod-argument,
broad-except,
c-extension-no-member,
cell-var-from-loop,
chained-comparison,
comparison-with-callable,
consider-iterating-dictionary,
consider-merging-isinstance,
consider-using-dict-comprehension,
consider-using-enumerate,
consider-using-get,
consider-using-in,
consider-using-set-comprehension,
consider-using-ternary,
deprecated-lambda,
deprecated-method,
deprecated-module,
eval-used,
exec-used,
expression-not-assigned,
fixme,
function-redefined,
global-statement,
global-variable-undefined,
import-self,
inconsistent-return-statements,
invalid-envvar-default,
invalid-name,
invalid-sequence-index,
keyword-arg-before-vararg,
len-as-condition,
line-too-long,
literal-comparison,
locally-disabled,
method-hidden,
misplaced-comparison-constant,
missing-docstring,
no-else-raise,
no-else-return,
no-init,
no-member,
no-name-in-module,
no-self-use,
no-value-for-parameter,
non-iterator-returned,
not-a-mapping,
not-an-iterable,
not-callable,
old-style-class,
pointless-statement,
pointless-string-statement,
possibly-unused-variable,
protected-access,
redefined-argument-from-local,
redefined-builtin,
redefined-outer-name,
redefined-variable-type,
reimported,
relative-beyond-top-level, # https://github.com/PyCQA/pylint/issues/2967
signature-differs,
simplifiable-if-expression,
simplifiable-if-statement,
subprocess-popen-preexec-fn,
super-init-not-called,
superfluous-parens,
too-few-public-methods,
too-many-ancestors,
too-many-arguments,
too-many-boolean-expressions,
too-many-branches,
too-many-function-args,
too-many-instance-attributes,
too-many-lines,
too-many-locals,
too-many-nested-blocks,
too-many-public-methods,
too-many-return-statements,
too-many-statements,
trailing-comma-tuple,
trailing-comma-tuple,
try-except-raise,
unbalanced-tuple-unpacking,
undefined-loop-variable,
unexpected-keyword-arg,
ungrouped-imports,
unidiomatic-typecheck,
unnecessary-pass,
unsubscriptable-object,
unsupported-assignment-operation,
unsupported-delete-operation,
unsupported-membership-test,
unused-argument,
unused-import,
unused-variable,
used-before-assignment,
useless-object-inheritance,
useless-return,
useless-super-delegation,
wrong-import-order,
wrong-import-position,
[BASIC]
bad-names=foo,
bar,
baz,
toto,
tutu,
tata,
_,
good-names=i,
j,
k,
ex,
Run,
[TYPECHECK]
ignored-modules=
_MovedItems,

View file

@ -53,10 +53,10 @@ class YamlChecker:
if extension in ('.yml', '.yaml'):
self.check_yaml(yaml_conf, path, contents)
elif extension == '.py':
if path.startswith('lib/ansible/plugins/'):
conf = plugin_conf
else:
if path.startswith('lib/ansible/modules/') or path.startswith('plugins/modules/'):
conf = module_conf
else:
conf = plugin_conf
self.check_module(conf, path, contents)
else:
@ -145,11 +145,14 @@ class YamlChecker:
if not module_ast:
return {}
if path.startswith('lib/ansible/modules/') or path.startswith('lib/ansible/plugins/'):
is_plugin = path.startswith('lib/ansible/modules/') or path.startswith('lib/ansible/plugins/') or path.startswith('plugins/')
is_doc_fragment = path.startswith('lib/ansible/plugins/doc_fragments/') or path.startswith('plugins/doc_fragments/')
if is_plugin and not is_doc_fragment:
for body_statement in module_ast.body:
if isinstance(body_statement, ast.Assign):
check_assignment(body_statement, module_doc_types)
elif path.startswith('lib/ansible/plugins/doc_fragments/'):
elif is_doc_fragment:
for body_statement in module_ast.body:
if isinstance(body_statement, ast.ClassDef):
for class_statement in body_statement.body:

View file

@ -0,0 +1,30 @@
"""Enable unit testing of Ansible collections."""
from __future__ import (absolute_import, division, print_function)
import os
import sys
# set by ansible-test to a single directory, rather than a list of directories as supported by Ansible itself
ANSIBLE_COLLECTIONS_PATH = os.path.join(os.environ['ANSIBLE_COLLECTIONS_PATHS'], 'ansible_collections')
def collection_pypkgpath(self):
for parent in self.parts(reverse=True):
if str(parent) == ANSIBLE_COLLECTIONS_PATH:
return parent
raise Exception('File "%s" not found in collection path "%s".' % (self.strpath, ANSIBLE_COLLECTIONS_PATH))
def pytest_configure():
from ansible.utils.collection_loader import AnsibleCollectionLoader
# allow unit tests to import code from collections
sys.meta_path.insert(0, AnsibleCollectionLoader())
import py._path.local
# force collections unit tests to be loaded with the ansible_collections namespace
# original idea from https://stackoverflow.com/questions/50174130/how-do-i-pytest-a-project-using-pep-420-namespace-packages/50175552#50175552
py._path.local.LocalPath.pypkgpath = collection_pypkgpath