ansible/test/lib/ansible_test/_internal/import_analysis.py

304 lines
11 KiB
Python
Raw Normal View History

"""Analyze python import statements."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import ast
import os
from . import types as t
from .io import (
read_binary_file,
)
from .util import (
display,
ApplicationError,
is_subdir,
)
from .data import (
data_context,
)
VIRTUAL_PACKAGES = set([
'ansible.module_utils.six',
])
def get_python_module_utils_imports(compile_targets):
"""Return a dictionary of module_utils names mapped to sets of python file paths.
:type compile_targets: list[TestTarget]
:rtype: dict[str, set[str]]
"""
module_utils = enumerate_module_utils()
virtual_utils = set(m for m in module_utils if any(m.startswith('%s.' % v) for v in VIRTUAL_PACKAGES))
module_utils -= virtual_utils
imports_by_target_path = {}
for target in compile_targets:
imports_by_target_path[target.path] = extract_python_module_utils_imports(target.path, module_utils)
def recurse_import(import_name, depth=0, seen=None): # type: (str, int, t.Optional[t.Set[str]]) -> t.Set[str]
"""Recursively expand module_utils imports from module_utils files."""
display.info('module_utils import: %s%s' % (' ' * depth, import_name), verbosity=4)
if seen is None:
seen = set([import_name])
results = set([import_name])
# virtual packages depend on the modules they contain instead of the reverse
if import_name in VIRTUAL_PACKAGES:
for sub_import in sorted(virtual_utils):
if sub_import.startswith('%s.' % import_name):
if sub_import in seen:
continue
seen.add(sub_import)
matches = sorted(recurse_import(sub_import, depth + 1, seen))
for result in matches:
results.add(result)
import_path = get_import_path(import_name)
if import_path not in imports_by_target_path:
import_path = get_import_path(import_name, package=True)
if import_path not in imports_by_target_path:
raise ApplicationError('Cannot determine path for module_utils import: %s' % import_name)
# process imports in reverse so the deepest imports come first
for name in sorted(imports_by_target_path[import_path], reverse=True):
if name in virtual_utils:
continue
if name in seen:
continue
seen.add(name)
matches = sorted(recurse_import(name, depth + 1, seen))
for result in matches:
results.add(result)
return results
for module_util in module_utils:
# recurse over module_utils imports while excluding self
module_util_imports = recurse_import(module_util)
module_util_imports.remove(module_util)
# add recursive imports to all path entries which import this module_util
for target_path in imports_by_target_path:
if module_util in imports_by_target_path[target_path]:
for module_util_import in sorted(module_util_imports):
if module_util_import not in imports_by_target_path[target_path]:
display.info('%s inherits import %s via %s' % (target_path, module_util_import, module_util), verbosity=6)
imports_by_target_path[target_path].add(module_util_import)
imports = dict([(module_util, set()) for module_util in module_utils | virtual_utils])
for target_path in imports_by_target_path:
for module_util in imports_by_target_path[target_path]:
imports[module_util].add(target_path)
# for purposes of mapping module_utils to paths, treat imports of virtual utils the same as the parent package
for virtual_util in virtual_utils:
parent_package = '.'.join(virtual_util.split('.')[:-1])
imports[virtual_util] = imports[parent_package]
display.info('%s reports imports from parent package %s' % (virtual_util, parent_package), verbosity=6)
for module_util in sorted(imports):
if not imports[module_util]:
package_path = get_import_path(module_util, package=True)
if os.path.exists(package_path) and not os.path.getsize(package_path):
continue # ignore empty __init__.py files
display.warning('No imports found which use the "%s" module_util.' % module_util)
return imports
def get_python_module_utils_name(path): # type: (str) -> str
"""Return a namespace and name from the given module_utils path."""
base_path = data_context().content.module_utils_path
if data_context().content.collection:
prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils'
else:
prefix = 'ansible.module_utils'
if path.endswith('/__init__.py'):
path = os.path.dirname(path)
if path == base_path:
name = prefix
else:
name = prefix + '.' + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
return name
def enumerate_module_utils():
"""Return a list of available module_utils imports.
:rtype: set[str]
"""
module_utils = []
for path in data_context().content.walk_files(data_context().content.module_utils_path):
ext = os.path.splitext(path)[1]
if ext != '.py':
continue
module_utils.append(get_python_module_utils_name(path))
return set(module_utils)
def extract_python_module_utils_imports(path, module_utils):
"""Return a list of module_utils imports found in the specified source file.
:type path: str
:type module_utils: set[str]
:rtype: set[str]
"""
# Python code must be read as bytes to avoid a SyntaxError when the source uses comments to declare the file encoding.
# See: https://www.python.org/dev/peps/pep-0263
# Specifically: If a Unicode string with a coding declaration is passed to compile(), a SyntaxError will be raised.
code = read_binary_file(path)
try:
tree = ast.parse(code)
except SyntaxError as ex:
# Treat this error as a warning so tests can be executed as best as possible.
# The compile test will detect and report this syntax error.
display.warning('%s:%s Syntax error extracting module_utils imports: %s' % (path, ex.lineno, ex.msg))
return set()
finder = ModuleUtilFinder(path, module_utils)
finder.visit(tree)
return finder.imports
def get_import_path(name, package=False): # type: (str, bool) -> str
"""Return a path from an import name."""
if package:
filename = os.path.join(name.replace('.', '/'), '__init__.py')
else:
filename = '%s.py' % name.replace('.', '/')
if name.startswith('ansible.module_utils.') or name == 'ansible.module_utils':
path = os.path.join('lib', filename)
elif data_context().content.collection and (
name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name) or
name == 'ansible_collections.%s.plugins.module_utils' % data_context().content.collection.full_name):
path = '/'.join(filename.split('/')[3:])
else:
raise Exception('Unexpected import name: %s' % name)
return path
class ModuleUtilFinder(ast.NodeVisitor):
"""AST visitor to find valid module_utils imports."""
def __init__(self, path, module_utils):
"""Return a list of module_utils imports found in the specified source file.
:type path: str
:type module_utils: set[str]
"""
self.path = path
self.module_utils = module_utils
self.imports = set()
# implicitly import parent package
if path.endswith('/__init__.py'):
path = os.path.split(path)[0]
if path.startswith('lib/ansible/module_utils/'):
package = os.path.split(path)[0].replace('/', '.')[4:]
if package != 'ansible.module_utils' and package not in VIRTUAL_PACKAGES:
self.add_import(package, 0)
# noinspection PyPep8Naming
# pylint: disable=locally-disabled, invalid-name
def visit_Import(self, node):
"""
:type node: ast.Import
"""
self.generic_visit(node)
# import ansible.module_utils.MODULE[.MODULE]
# import ansible_collections.{ns}.{col}.plugins.module_utils.module_utils.MODULE[.MODULE]
self.add_imports([alias.name for alias in node.names], node.lineno)
# noinspection PyPep8Naming
# pylint: disable=locally-disabled, invalid-name
def visit_ImportFrom(self, node):
"""
:type node: ast.ImportFrom
"""
self.generic_visit(node)
if not node.module:
return
if not node.module.startswith('ansible'):
return
# from ansible.module_utils import MODULE[, MODULE]
# from ansible.module_utils.MODULE[.MODULE] import MODULE[, MODULE]
# from ansible_collections.{ns}.{col}.plugins.module_utils import MODULE[, MODULE]
# from ansible_collections.{ns}.{col}.plugins.module_utils.MODULE[.MODULE] import MODULE[, MODULE]
self.add_imports(['%s.%s' % (node.module, alias.name) for alias in node.names], node.lineno)
def add_import(self, name, line_number):
"""
:type name: str
:type line_number: int
"""
import_name = name
while self.is_module_util_name(name):
if name in self.module_utils:
if name not in self.imports:
display.info('%s:%d imports module_utils: %s' % (self.path, line_number, name), verbosity=5)
self.imports.add(name)
return # duplicate imports are ignored
name = '.'.join(name.split('.')[:-1])
if is_subdir(self.path, data_context().content.test_path):
return # invalid imports in tests are ignored
# Treat this error as a warning so tests can be executed as best as possible.
# This error should be detected by unit or integration tests.
display.warning('%s:%d Invalid module_utils import: %s' % (self.path, line_number, import_name))
def add_imports(self, names, line_no): # type: (t.List[str], int) -> None
"""Add the given import names if they are module_utils imports."""
for name in names:
if self.is_module_util_name(name):
self.add_import(name, line_no)
@staticmethod
def is_module_util_name(name): # type: (str) -> bool
"""Return True if the given name is a module_util name for the content under test. External module_utils are ignored."""
if data_context().content.is_ansible and name.startswith('ansible.module_utils.'):
return True
if data_context().content.collection and name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name):
return True
return False