Clean up code in ansible-test. (#73379)
* Relocate code to fix type dependencies. * Fix missing and unused imports. * Fix type hints. * Suppress PyCharm false positives. * Avoid shadowing `file` built-in. * Use json.JSONEncoder directly instead of super(). This matches the recommended usage and avoids a PyCharm warning. * Remove redundant regex escape. * Remove redundant find_python call. * Use tarfile.open directly. * Add changelog fragment.
This commit is contained in:
parent
76604397cb
commit
73fadc5e97
21 changed files with 66 additions and 51 deletions
changelogs/fragments
test/lib/ansible_test
_data
_internal
2
changelogs/fragments/ansible-test-more-code-cleanup.yml
Normal file
2
changelogs/fragments/ansible-test-more-code-cleanup.yml
Normal file
|
@ -0,0 +1,2 @@
|
|||
minor_changes:
|
||||
- ansible-test - Cleaned up code to resolve warnings and errors reported by PyCharm.
|
|
@ -112,6 +112,7 @@ def get_collection_version():
|
|||
sys.modules['collection_detail'] = collection_detail
|
||||
collection_detail_spec.loader.exec_module(collection_detail)
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
result = collection_detail.read_manifest_json('.') or collection_detail.read_galaxy_yml('.')
|
||||
return SemanticVersion(result['version'])
|
||||
|
|
|
@ -81,7 +81,7 @@ class YamlChecker:
|
|||
|
||||
def check(self, paths):
|
||||
"""
|
||||
:type paths: str
|
||||
:type paths: t.List[str]
|
||||
"""
|
||||
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config')
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function)
|
|||
__metaclass__ = type
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
try:
|
||||
from sys import real_prefix
|
||||
|
|
|
@ -181,12 +181,14 @@ class CryptographyAuthHelper(AuthHelper, ABC): # pylint: disable=abstract-metho
|
|||
private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
|
||||
public_key = private_key.public_key()
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
private_key_pem = to_text(private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
))
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
public_key_pem = to_text(public_key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
|
|
|
@ -678,7 +678,7 @@ def key_value(argparse, value): # type: (argparse_module, str) -> t.Tuple[str,
|
|||
return parts[0], parts[1]
|
||||
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
# noinspection PyProtectedMember,PyUnresolvedReferences
|
||||
def add_coverage_analyze(coverage_subparsers, coverage_common): # type: (argparse_module._SubParsersAction, argparse_module.ArgumentParser) -> None
|
||||
"""Add the `coverage analyze` subcommand."""
|
||||
analyze = coverage_subparsers.add_parser(
|
||||
|
|
|
@ -97,6 +97,7 @@ class AwsCloudEnvironment(CloudEnvironment):
|
|||
resource_prefix=self.resource_prefix,
|
||||
)
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
ansible_vars.update(dict(parser.items('default')))
|
||||
|
||||
display.sensitive.add(ansible_vars.get('aws_secret_key'))
|
||||
|
|
|
@ -30,6 +30,7 @@ from .data import (
|
|||
)
|
||||
|
||||
try:
|
||||
# noinspection PyTypeChecker
|
||||
TIntegrationConfig = t.TypeVar('TIntegrationConfig', bound='IntegrationConfig')
|
||||
except AttributeError:
|
||||
TIntegrationConfig = None # pylint: disable=invalid-name
|
||||
|
|
|
@ -163,8 +163,8 @@ def enumerate_python_arcs(
|
|||
try:
|
||||
original.read_file(path)
|
||||
except Exception as ex: # pylint: disable=locally-disabled, broad-except
|
||||
with open_binary_file(path) as file:
|
||||
header = file.read(6)
|
||||
with open_binary_file(path) as file_obj:
|
||||
header = file_obj.read(6)
|
||||
|
||||
if header == b'SQLite':
|
||||
display.error('File created by "coverage" 5.0+: %s' % os.path.relpath(path))
|
||||
|
|
|
@ -6,6 +6,8 @@ import json
|
|||
import os
|
||||
import time
|
||||
|
||||
from . import types as t
|
||||
|
||||
from .io import (
|
||||
open_binary_file,
|
||||
read_text_file,
|
||||
|
|
|
@ -30,6 +30,7 @@ from .core_ci import (
|
|||
from .manage_ci import (
|
||||
ManageWindowsCI,
|
||||
ManageNetworkCI,
|
||||
get_network_settings,
|
||||
)
|
||||
|
||||
from .cloud import (
|
||||
|
@ -73,7 +74,6 @@ from .util import (
|
|||
|
||||
from .util_common import (
|
||||
get_docker_completion,
|
||||
get_network_settings,
|
||||
get_remote_completion,
|
||||
get_python_path,
|
||||
intercept_command,
|
||||
|
@ -259,7 +259,6 @@ def get_cryptography_requirement(args, python, python_version): # type: (Enviro
|
|||
Return the correct cryptography requirement for the given python version.
|
||||
The version of cryptography installed depends on the python version, setuptools version and openssl version.
|
||||
"""
|
||||
python = find_python(python_version)
|
||||
setuptools_version = get_setuptools_version(args, python)
|
||||
openssl_version = get_openssl_version(args, python, python_version)
|
||||
|
||||
|
@ -624,7 +623,7 @@ def command_network_integration(args):
|
|||
time.sleep(1)
|
||||
|
||||
remotes = [instance.wait_for_result() for instance in instances]
|
||||
inventory = network_inventory(remotes)
|
||||
inventory = network_inventory(args, remotes)
|
||||
|
||||
display.info('>>> Inventory: %s\n%s' % (inventory_path, inventory.strip()), verbosity=3)
|
||||
|
||||
|
@ -702,14 +701,15 @@ def network_run(args, platform, version, config):
|
|||
core_ci.load(config)
|
||||
core_ci.wait()
|
||||
|
||||
manage = ManageNetworkCI(core_ci)
|
||||
manage = ManageNetworkCI(args, core_ci)
|
||||
manage.wait()
|
||||
|
||||
return core_ci
|
||||
|
||||
|
||||
def network_inventory(remotes):
|
||||
def network_inventory(args, remotes):
|
||||
"""
|
||||
:type args: NetworkIntegrationConfig
|
||||
:type remotes: list[AnsibleCoreCI]
|
||||
:rtype: str
|
||||
"""
|
||||
|
@ -723,7 +723,7 @@ def network_inventory(remotes):
|
|||
ansible_ssh_private_key_file=os.path.abspath(remote.ssh_key.key),
|
||||
)
|
||||
|
||||
settings = get_network_settings(remote.args, remote.platform, remote.version)
|
||||
settings = get_network_settings(args, remote.platform, remote.version)
|
||||
|
||||
options.update(settings.inventory_vars)
|
||||
|
||||
|
|
|
@ -28,8 +28,8 @@ def read_text_file(path): # type: (t.AnyStr) -> t.Text
|
|||
|
||||
def read_binary_file(path): # type: (t.AnyStr) -> bytes
|
||||
"""Return the contents of the specified path as bytes."""
|
||||
with open_binary_file(path) as file:
|
||||
return file.read()
|
||||
with open_binary_file(path) as file_obj:
|
||||
return file_obj.read()
|
||||
|
||||
|
||||
def make_dirs(path): # type: (str) -> None
|
||||
|
@ -63,8 +63,8 @@ def write_text_file(path, content, create_directories=False): # type: (str, str
|
|||
if create_directories:
|
||||
make_dirs(os.path.dirname(path))
|
||||
|
||||
with open_binary_file(path, 'wb') as file:
|
||||
file.write(to_bytes(content))
|
||||
with open_binary_file(path, 'wb') as file_obj:
|
||||
file_obj.write(to_bytes(content))
|
||||
|
||||
|
||||
def open_text_file(path, mode='r'): # type: (str, str) -> t.TextIO
|
||||
|
@ -91,4 +91,4 @@ class SortedSetEncoder(json.JSONEncoder):
|
|||
if isinstance(obj, set):
|
||||
return sorted(obj)
|
||||
|
||||
return super(SortedSetEncoder).default(self, obj)
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
|
|
|
@ -6,6 +6,8 @@ import os
|
|||
import tempfile
|
||||
import time
|
||||
|
||||
from . import types as t
|
||||
|
||||
from .util import (
|
||||
SubprocessError,
|
||||
ApplicationError,
|
||||
|
@ -16,7 +18,7 @@ from .util import (
|
|||
|
||||
from .util_common import (
|
||||
intercept_command,
|
||||
get_network_settings,
|
||||
get_network_completion,
|
||||
run_command,
|
||||
)
|
||||
|
||||
|
@ -29,6 +31,7 @@ from .ansible_util import (
|
|||
)
|
||||
|
||||
from .config import (
|
||||
NetworkIntegrationConfig,
|
||||
ShellConfig,
|
||||
)
|
||||
|
||||
|
@ -142,15 +145,17 @@ class ManageWindowsCI:
|
|||
|
||||
class ManageNetworkCI:
|
||||
"""Manage access to a network instance provided by Ansible Core CI."""
|
||||
def __init__(self, core_ci):
|
||||
def __init__(self, args, core_ci):
|
||||
"""
|
||||
:type args: NetworkIntegrationConfig
|
||||
:type core_ci: AnsibleCoreCI
|
||||
"""
|
||||
self.args = args
|
||||
self.core_ci = core_ci
|
||||
|
||||
def wait(self):
|
||||
"""Wait for instance to respond to ansible ping."""
|
||||
settings = get_network_settings(self.core_ci.args, self.core_ci.platform, self.core_ci.version)
|
||||
settings = get_network_settings(self.args, self.core_ci.platform, self.core_ci.version)
|
||||
|
||||
extra_vars = [
|
||||
'ansible_host=%s' % self.core_ci.connection.hostname,
|
||||
|
@ -333,3 +338,27 @@ class ManagePosixCI:
|
|||
time.sleep(10)
|
||||
|
||||
raise ApplicationError('Failed transfer: %s -> %s' % (src, dst))
|
||||
|
||||
|
||||
def get_network_settings(args, platform, version): # type: (NetworkIntegrationConfig, str, str) -> NetworkPlatformSettings
|
||||
"""Returns settings for the given network platform and version."""
|
||||
platform_version = '%s/%s' % (platform, version)
|
||||
completion = get_network_completion().get(platform_version, {})
|
||||
collection = args.platform_collection.get(platform, completion.get('collection'))
|
||||
|
||||
settings = NetworkPlatformSettings(
|
||||
collection,
|
||||
dict(
|
||||
ansible_connection=args.platform_connection.get(platform, completion.get('connection')),
|
||||
ansible_network_os='%s.%s' % (collection, platform) if collection else platform,
|
||||
)
|
||||
)
|
||||
|
||||
return settings
|
||||
|
||||
|
||||
class NetworkPlatformSettings:
|
||||
"""Settings required for provisioning a network platform."""
|
||||
def __init__(self, collection, inventory_vars): # type: (str, t.Type[str, str]) -> None
|
||||
self.collection = collection
|
||||
self.inventory_vars = inventory_vars
|
||||
|
|
|
@ -120,7 +120,7 @@ def create_payload(args, dst_path): # type: (CommonConfig, str) -> None
|
|||
|
||||
start = time.time()
|
||||
|
||||
with tarfile.TarFile.open(dst_path, mode='w:gz', compresslevel=4, format=tarfile.GNU_FORMAT) as tar:
|
||||
with tarfile.open(dst_path, mode='w:gz', compresslevel=4, format=tarfile.GNU_FORMAT) as tar:
|
||||
for src, dst in files:
|
||||
display.info('%s -> %s' % (src, dst), verbosity=4)
|
||||
tar.add(src, dst, filter=filters.get(dst))
|
||||
|
|
|
@ -15,6 +15,7 @@ from ..util import (
|
|||
|
||||
|
||||
try:
|
||||
# noinspection PyTypeChecker
|
||||
TPathProvider = t.TypeVar('TPathProvider', bound='PathProvider')
|
||||
except AttributeError:
|
||||
TPathProvider = None # pylint: disable=invalid-name
|
||||
|
|
|
@ -200,7 +200,7 @@ class LayoutProvider(PathProvider):
|
|||
"""Create a layout using the given root and paths."""
|
||||
|
||||
|
||||
def paths_to_tree(paths): # type: (t.List[str]) -> t.Tuple(t.Dict[str, t.Any], t.List[str])
|
||||
def paths_to_tree(paths): # type: (t.List[str]) -> t.Tuple[t.Dict[str, t.Any], t.List[str]]
|
||||
"""Return a filesystem tree from the given list of paths."""
|
||||
tree = {}, []
|
||||
|
||||
|
@ -219,7 +219,7 @@ def paths_to_tree(paths): # type: (t.List[str]) -> t.Tuple(t.Dict[str, t.Any],
|
|||
return tree
|
||||
|
||||
|
||||
def get_tree_item(tree, parts): # type: (t.Tuple(t.Dict[str, t.Any], t.List[str]), t.List[str]) -> t.Optional[t.Tuple(t.Dict[str, t.Any], t.List[str])]
|
||||
def get_tree_item(tree, parts): # type: (t.Tuple[t.Dict[str, t.Any], t.List[str]], t.List[str]) -> t.Optional[t.Tuple[t.Dict[str, t.Any], t.List[str]]]
|
||||
"""Return the portion of the tree found under the path given by parts, or None if it does not exist."""
|
||||
root = tree
|
||||
|
||||
|
|
|
@ -127,7 +127,7 @@ class AnsibleDocTest(SanitySingleVersion):
|
|||
|
||||
if stderr:
|
||||
# ignore removed module/plugin warnings
|
||||
stderr = re.sub(r'\[WARNING\]: [^ ]+ [^ ]+ has been removed\n', '', stderr).strip()
|
||||
stderr = re.sub(r'\[WARNING]: [^ ]+ [^ ]+ has been removed\n', '', stderr).strip()
|
||||
|
||||
if stderr:
|
||||
summary = u'Output on stderr from ansible-doc is considered an error.\n\n%s' % SubprocessError(cmd, stderr=stderr)
|
||||
|
|
|
@ -33,11 +33,13 @@ from .data import (
|
|||
MODULE_EXTENSIONS = '.py', '.ps1'
|
||||
|
||||
try:
|
||||
# noinspection PyTypeChecker
|
||||
TCompletionTarget = t.TypeVar('TCompletionTarget', bound='CompletionTarget')
|
||||
except AttributeError:
|
||||
TCompletionTarget = None # pylint: disable=invalid-name
|
||||
|
||||
try:
|
||||
# noinspection PyTypeChecker
|
||||
TIntegrationTarget = t.TypeVar('TIntegrationTarget', bound='IntegrationTarget')
|
||||
except AttributeError:
|
||||
TIntegrationTarget = None # pylint: disable=invalid-name
|
||||
|
|
|
@ -157,6 +157,7 @@ class TestResult:
|
|||
try:
|
||||
to_xml_string = self.junit.to_xml_report_string
|
||||
except AttributeError:
|
||||
# noinspection PyDeprecation
|
||||
to_xml_string = self.junit.TestSuite.to_xml_string
|
||||
|
||||
report = to_xml_string(test_suites=test_suites, prettyprint=True, encoding='utf-8')
|
||||
|
|
|
@ -615,7 +615,7 @@ class Display:
|
|||
"""
|
||||
:type message: str
|
||||
:type color: str | None
|
||||
:type fd: file
|
||||
:type fd: t.IO[str]
|
||||
:type truncate: bool
|
||||
"""
|
||||
if self.redact and self.sensitive:
|
||||
|
@ -815,13 +815,11 @@ def load_module(path, name): # type: (str, str) -> None
|
|||
return
|
||||
|
||||
if sys.version_info >= (3, 4):
|
||||
# noinspection PyUnresolvedReferences
|
||||
import importlib.util
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
# noinspection PyUnresolvedReferences
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
# noinspection PyUnresolvedReferences
|
||||
spec.loader.exec_module(module)
|
||||
|
||||
sys.modules[name] = module
|
||||
|
|
|
@ -115,13 +115,6 @@ class CommonConfig:
|
|||
return os.path.join(ANSIBLE_TEST_DATA_ROOT, 'ansible.cfg')
|
||||
|
||||
|
||||
class NetworkPlatformSettings:
|
||||
"""Settings required for provisioning a network platform."""
|
||||
def __init__(self, collection, inventory_vars): # type: (str, t.Type[str, str]) -> None
|
||||
self.collection = collection
|
||||
self.inventory_vars = inventory_vars
|
||||
|
||||
|
||||
def get_docker_completion():
|
||||
"""
|
||||
:rtype: dict[str, dict[str, str]]
|
||||
|
@ -185,23 +178,6 @@ def docker_qualify_image(name):
|
|||
return config.get('name', name)
|
||||
|
||||
|
||||
def get_network_settings(args, platform, version): # type: (NetworkIntegrationConfig, str, str) -> NetworkPlatformSettings
|
||||
"""Returns settings for the given network platform and version."""
|
||||
platform_version = '%s/%s' % (platform, version)
|
||||
completion = get_network_completion().get(platform_version, {})
|
||||
collection = args.platform_collection.get(platform, completion.get('collection'))
|
||||
|
||||
settings = NetworkPlatformSettings(
|
||||
collection,
|
||||
dict(
|
||||
ansible_connection=args.platform_connection.get(platform, completion.get('connection')),
|
||||
ansible_network_os='%s.%s' % (collection, platform) if collection else platform,
|
||||
)
|
||||
)
|
||||
|
||||
return settings
|
||||
|
||||
|
||||
def handle_layout_messages(messages): # type: (t.Optional[LayoutMessages]) -> None
|
||||
"""Display the given layout messages."""
|
||||
if not messages:
|
||||
|
|
Loading…
Add table
Reference in a new issue