10fe54de58
Now empty `*.py` files are ignored during module_utils import analysis for change detection. This eliminates "No imports found" warnings for files which should have no imports.
296 lines
10 KiB
Python
296 lines
10 KiB
Python
"""Analyze python import statements."""
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import ast
|
|
import os
|
|
|
|
from . import types as t
|
|
|
|
from .io import (
|
|
read_binary_file,
|
|
)
|
|
|
|
from .util import (
|
|
display,
|
|
ApplicationError,
|
|
is_subdir,
|
|
)
|
|
|
|
from .data import (
|
|
data_context,
|
|
)
|
|
|
|
VIRTUAL_PACKAGES = set([
|
|
'ansible.module_utils.six',
|
|
])
|
|
|
|
|
|
def get_python_module_utils_imports(compile_targets):
|
|
"""Return a dictionary of module_utils names mapped to sets of python file paths.
|
|
:type compile_targets: list[TestTarget]
|
|
:rtype: dict[str, set[str]]
|
|
"""
|
|
|
|
module_utils = enumerate_module_utils()
|
|
|
|
virtual_utils = set(m for m in module_utils if any(m.startswith('%s.' % v) for v in VIRTUAL_PACKAGES))
|
|
module_utils -= virtual_utils
|
|
|
|
imports_by_target_path = {}
|
|
|
|
for target in compile_targets:
|
|
imports_by_target_path[target.path] = extract_python_module_utils_imports(target.path, module_utils)
|
|
|
|
def recurse_import(import_name, depth=0, seen=None): # type: (str, int, t.Optional[t.Set[str]]) -> t.Set[str]
|
|
"""Recursively expand module_utils imports from module_utils files."""
|
|
display.info('module_utils import: %s%s' % (' ' * depth, import_name), verbosity=4)
|
|
|
|
if seen is None:
|
|
seen = set([import_name])
|
|
|
|
results = set([import_name])
|
|
|
|
# virtual packages depend on the modules they contain instead of the reverse
|
|
if import_name in VIRTUAL_PACKAGES:
|
|
for sub_import in sorted(virtual_utils):
|
|
if sub_import.startswith('%s.' % import_name):
|
|
if sub_import in seen:
|
|
continue
|
|
|
|
seen.add(sub_import)
|
|
|
|
matches = sorted(recurse_import(sub_import, depth + 1, seen))
|
|
|
|
for result in matches:
|
|
results.add(result)
|
|
|
|
import_path = get_import_path(import_name)
|
|
|
|
if import_path not in imports_by_target_path:
|
|
import_path = get_import_path(import_name, package=True)
|
|
|
|
if import_path not in imports_by_target_path:
|
|
raise ApplicationError('Cannot determine path for module_utils import: %s' % import_name)
|
|
|
|
# process imports in reverse so the deepest imports come first
|
|
for name in sorted(imports_by_target_path[import_path], reverse=True):
|
|
if name in virtual_utils:
|
|
continue
|
|
|
|
if name in seen:
|
|
continue
|
|
|
|
seen.add(name)
|
|
|
|
matches = sorted(recurse_import(name, depth + 1, seen))
|
|
|
|
for result in matches:
|
|
results.add(result)
|
|
|
|
return results
|
|
|
|
for module_util in module_utils:
|
|
# recurse over module_utils imports while excluding self
|
|
module_util_imports = recurse_import(module_util)
|
|
module_util_imports.remove(module_util)
|
|
|
|
# add recursive imports to all path entries which import this module_util
|
|
for target_path in imports_by_target_path:
|
|
if module_util in imports_by_target_path[target_path]:
|
|
for module_util_import in sorted(module_util_imports):
|
|
if module_util_import not in imports_by_target_path[target_path]:
|
|
display.info('%s inherits import %s via %s' % (target_path, module_util_import, module_util), verbosity=6)
|
|
imports_by_target_path[target_path].add(module_util_import)
|
|
|
|
imports = dict([(module_util, set()) for module_util in module_utils | virtual_utils])
|
|
|
|
for target_path in imports_by_target_path:
|
|
for module_util in imports_by_target_path[target_path]:
|
|
imports[module_util].add(target_path)
|
|
|
|
# for purposes of mapping module_utils to paths, treat imports of virtual utils the same as the parent package
|
|
for virtual_util in virtual_utils:
|
|
parent_package = '.'.join(virtual_util.split('.')[:-1])
|
|
imports[virtual_util] = imports[parent_package]
|
|
display.info('%s reports imports from parent package %s' % (virtual_util, parent_package), verbosity=6)
|
|
|
|
for module_util in sorted(imports):
|
|
if not imports[module_util]:
|
|
display.warning('No imports found which use the "%s" module_util.' % module_util)
|
|
|
|
return imports
|
|
|
|
|
|
def get_python_module_utils_name(path): # type: (str) -> str
|
|
"""Return a namespace and name from the given module_utils path."""
|
|
base_path = data_context().content.module_utils_path
|
|
|
|
if data_context().content.collection:
|
|
prefix = 'ansible_collections.' + data_context().content.collection.prefix + 'plugins.module_utils.'
|
|
else:
|
|
prefix = 'ansible.module_utils.'
|
|
|
|
if path.endswith('/__init__.py'):
|
|
path = os.path.dirname(path)
|
|
|
|
name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
|
|
|
|
return name
|
|
|
|
|
|
def enumerate_module_utils():
|
|
"""Return a list of available module_utils imports.
|
|
:rtype: set[str]
|
|
"""
|
|
module_utils = []
|
|
|
|
for path in data_context().content.walk_files(data_context().content.module_utils_path):
|
|
ext = os.path.splitext(path)[1]
|
|
|
|
if ext != '.py':
|
|
continue
|
|
|
|
if os.path.getsize(path) == 0:
|
|
continue
|
|
|
|
module_utils.append(get_python_module_utils_name(path))
|
|
|
|
return set(module_utils)
|
|
|
|
|
|
def extract_python_module_utils_imports(path, module_utils):
|
|
"""Return a list of module_utils imports found in the specified source file.
|
|
:type path: str
|
|
:type module_utils: set[str]
|
|
:rtype: set[str]
|
|
"""
|
|
# Python code must be read as bytes to avoid a SyntaxError when the source uses comments to declare the file encoding.
|
|
# See: https://www.python.org/dev/peps/pep-0263
|
|
# Specifically: If a Unicode string with a coding declaration is passed to compile(), a SyntaxError will be raised.
|
|
code = read_binary_file(path)
|
|
|
|
try:
|
|
tree = ast.parse(code)
|
|
except SyntaxError as ex:
|
|
# Treat this error as a warning so tests can be executed as best as possible.
|
|
# The compile test will detect and report this syntax error.
|
|
display.warning('%s:%s Syntax error extracting module_utils imports: %s' % (path, ex.lineno, ex.msg))
|
|
return set()
|
|
|
|
finder = ModuleUtilFinder(path, module_utils)
|
|
finder.visit(tree)
|
|
return finder.imports
|
|
|
|
|
|
def get_import_path(name, package=False): # type: (str, bool) -> str
|
|
"""Return a path from an import name."""
|
|
if package:
|
|
filename = os.path.join(name.replace('.', '/'), '__init__.py')
|
|
else:
|
|
filename = '%s.py' % name.replace('.', '/')
|
|
|
|
if name.startswith('ansible.module_utils.'):
|
|
path = os.path.join('lib', filename)
|
|
elif data_context().content.collection and name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name):
|
|
path = '/'.join(filename.split('/')[3:])
|
|
else:
|
|
raise Exception('Unexpected import name: %s' % name)
|
|
|
|
return path
|
|
|
|
|
|
class ModuleUtilFinder(ast.NodeVisitor):
|
|
"""AST visitor to find valid module_utils imports."""
|
|
def __init__(self, path, module_utils):
|
|
"""Return a list of module_utils imports found in the specified source file.
|
|
:type path: str
|
|
:type module_utils: set[str]
|
|
"""
|
|
self.path = path
|
|
self.module_utils = module_utils
|
|
self.imports = set()
|
|
|
|
# implicitly import parent package
|
|
|
|
if path.endswith('/__init__.py'):
|
|
path = os.path.split(path)[0]
|
|
|
|
if path.startswith('lib/ansible/module_utils/'):
|
|
package = os.path.split(path)[0].replace('/', '.')[4:]
|
|
|
|
if package != 'ansible.module_utils' and package not in VIRTUAL_PACKAGES:
|
|
self.add_import(package, 0)
|
|
|
|
# noinspection PyPep8Naming
|
|
# pylint: disable=locally-disabled, invalid-name
|
|
def visit_Import(self, node):
|
|
"""
|
|
:type node: ast.Import
|
|
"""
|
|
self.generic_visit(node)
|
|
|
|
# import ansible.module_utils.MODULE[.MODULE]
|
|
# import ansible_collections.{ns}.{col}.plugins.module_utils.module_utils.MODULE[.MODULE]
|
|
self.add_imports([alias.name for alias in node.names], node.lineno)
|
|
|
|
# noinspection PyPep8Naming
|
|
# pylint: disable=locally-disabled, invalid-name
|
|
def visit_ImportFrom(self, node):
|
|
"""
|
|
:type node: ast.ImportFrom
|
|
"""
|
|
self.generic_visit(node)
|
|
|
|
if not node.module:
|
|
return
|
|
|
|
if not node.module.startswith('ansible'):
|
|
return
|
|
|
|
# from ansible.module_utils import MODULE[, MODULE]
|
|
# from ansible.module_utils.MODULE[.MODULE] import MODULE[, MODULE]
|
|
# from ansible_collections.{ns}.{col}.plugins.module_utils import MODULE[, MODULE]
|
|
# from ansible_collections.{ns}.{col}.plugins.module_utils.MODULE[.MODULE] import MODULE[, MODULE]
|
|
self.add_imports(['%s.%s' % (node.module, alias.name) for alias in node.names], node.lineno)
|
|
|
|
def add_import(self, name, line_number):
|
|
"""
|
|
:type name: str
|
|
:type line_number: int
|
|
"""
|
|
import_name = name
|
|
|
|
while self.is_module_util_name(name):
|
|
if name in self.module_utils:
|
|
if name not in self.imports:
|
|
display.info('%s:%d imports module_utils: %s' % (self.path, line_number, name), verbosity=5)
|
|
self.imports.add(name)
|
|
|
|
return # duplicate imports are ignored
|
|
|
|
name = '.'.join(name.split('.')[:-1])
|
|
|
|
if is_subdir(self.path, data_context().content.test_path):
|
|
return # invalid imports in tests are ignored
|
|
|
|
# Treat this error as a warning so tests can be executed as best as possible.
|
|
# This error should be detected by unit or integration tests.
|
|
display.warning('%s:%d Invalid module_utils import: %s' % (self.path, line_number, import_name))
|
|
|
|
def add_imports(self, names, line_no): # type: (t.List[str], int) -> None
|
|
"""Add the given import names if they are module_utils imports."""
|
|
for name in names:
|
|
if self.is_module_util_name(name):
|
|
self.add_import(name, line_no)
|
|
|
|
@staticmethod
|
|
def is_module_util_name(name): # type: (str) -> bool
|
|
"""Return True if the given name is a module_util name for the content under test. External module_utils are ignored."""
|
|
if data_context().content.is_ansible and name.startswith('ansible.module_utils.'):
|
|
return True
|
|
|
|
if data_context().content.collection and name.startswith('ansible_collections.%s.plugins.module_utils.' % data_context().content.collection.full_name):
|
|
return True
|
|
|
|
return False
|