ansible/test/lib/ansible_test/_internal/coverage/combine.py
Matt Clay fa2be89cd4 Add coverage exporting to ansible-test
A new `--export` option for `ansible-test coverage combine` allows multi-step aggregation of code coverage for CI pipelines.
2020-11-09 22:34:05 -08:00

297 lines
8.2 KiB
Python

"""Combine code coverage files."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ..target import (
walk_compile_targets,
walk_powershell_targets,
)
from ..io import (
read_text_file,
)
from ..util import (
display,
)
from ..util_common import (
ResultType,
write_json_file,
write_json_test_results,
)
from . import (
enumerate_python_arcs,
enumerate_powershell_lines,
get_collection_path_regexes,
get_python_coverage_files,
get_python_modules,
get_powershell_coverage_files,
initialize_coverage,
COVERAGE_OUTPUT_FILE_NAME,
COVERAGE_GROUPS,
CoverageConfig,
PathChecker,
)
def command_coverage_combine(args):
"""Patch paths in coverage files and merge into a single file.
:type args: CoverageConfig
:rtype: list[str]
"""
paths = _command_coverage_combine_powershell(args) + _command_coverage_combine_python(args)
for path in paths:
display.info('Generated combined output: %s' % path, verbosity=1)
return paths
def _command_coverage_combine_python(args):
"""
:type args: CoverageConfig
:rtype: list[str]
"""
coverage = initialize_coverage(args)
modules = get_python_modules()
coverage_files = get_python_coverage_files()
counter = 0
sources = _get_coverage_targets(args, walk_compile_targets)
groups = _build_stub_groups(args, sources, lambda line_count: set())
collection_search_re, collection_sub_re = get_collection_path_regexes()
for coverage_file in coverage_files:
counter += 1
display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
group = get_coverage_group(args, coverage_file)
if group is None:
display.warning('Unexpected name for coverage file: %s' % coverage_file)
continue
for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re):
if group not in groups:
groups[group] = {}
arc_data = groups[group]
if filename not in arc_data:
arc_data[filename] = set()
arc_data[filename].update(arcs)
output_files = []
if args.export:
coverage_file = os.path.join(args.export, '')
suffix = '=coverage.combined'
else:
coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME)
suffix = ''
path_checker = PathChecker(args, collection_search_re)
for group in sorted(groups):
arc_data = groups[group]
updated = coverage.CoverageData()
for filename in arc_data:
if not path_checker.check_path(filename):
continue
updated.add_arcs({filename: list(arc_data[filename])})
if args.all:
updated.add_arcs(dict((source[0], []) for source in sources))
if not args.explain:
output_file = coverage_file + group + suffix
updated.write_file(output_file) # always write files to make sure stale files do not exist
if updated:
# only report files which are non-empty to prevent coverage from reporting errors
output_files.append(output_file)
path_checker.report()
return sorted(output_files)
def _command_coverage_combine_powershell(args):
"""
:type args: CoverageConfig
:rtype: list[str]
"""
coverage_files = get_powershell_coverage_files()
def _default_stub_value(lines):
val = {}
for line in range(lines):
val[line] = 0
return val
counter = 0
sources = _get_coverage_targets(args, walk_powershell_targets)
groups = _build_stub_groups(args, sources, _default_stub_value)
collection_search_re, collection_sub_re = get_collection_path_regexes()
for coverage_file in coverage_files:
counter += 1
display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
group = get_coverage_group(args, coverage_file)
if group is None:
display.warning('Unexpected name for coverage file: %s' % coverage_file)
continue
for filename, hits in enumerate_powershell_lines(coverage_file, collection_search_re, collection_sub_re):
if group not in groups:
groups[group] = {}
coverage_data = groups[group]
if filename not in coverage_data:
coverage_data[filename] = {}
file_coverage = coverage_data[filename]
for line_no, hit_count in hits.items():
file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count
output_files = []
path_checker = PathChecker(args)
for group in sorted(groups):
coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename))
if args.all:
# Add 0 line entries for files not in coverage_data
for source, source_line_count in sources:
if source in coverage_data:
continue
coverage_data[source] = _default_stub_value(source_line_count)
if not args.explain:
if args.export:
output_file = os.path.join(args.export, group + '=coverage.combined')
write_json_file(output_file, coverage_data, formatted=False)
output_files.append(output_file)
continue
output_file = COVERAGE_OUTPUT_FILE_NAME + group + '-powershell'
write_json_test_results(ResultType.COVERAGE, output_file, coverage_data, formatted=False)
output_files.append(os.path.join(ResultType.COVERAGE.path, output_file))
path_checker.report()
return sorted(output_files)
def _get_coverage_targets(args, walk_func):
"""
:type args: CoverageConfig
:type walk_func: Func
:rtype: list[tuple[str, int]]
"""
sources = []
if args.all or args.stub:
# excludes symlinks of regular files to avoid reporting on the same file multiple times
# in the future it would be nice to merge any coverage for symlinks into the real files
for target in walk_func(include_symlinks=False):
target_path = os.path.abspath(target.path)
target_lines = len(read_text_file(target_path).splitlines())
sources.append((target_path, target_lines))
sources.sort()
return sources
def _build_stub_groups(args, sources, default_stub_value):
"""
:type args: CoverageConfig
:type sources: List[tuple[str, int]]
:type default_stub_value: Func[int]
:rtype: dict
"""
groups = {}
if args.stub:
stub_group = []
stub_groups = [stub_group]
stub_line_limit = 500000
stub_line_count = 0
for source, source_line_count in sources:
stub_group.append((source, source_line_count))
stub_line_count += source_line_count
if stub_line_count > stub_line_limit:
stub_line_count = 0
stub_group = []
stub_groups.append(stub_group)
for stub_index, stub_group in enumerate(stub_groups):
if not stub_group:
continue
groups['=stub-%02d' % (stub_index + 1)] = dict((source, default_stub_value(line_count))
for source, line_count in stub_group)
return groups
def get_coverage_group(args, coverage_file):
"""
:type args: CoverageConfig
:type coverage_file: str
:rtype: str
"""
parts = os.path.basename(coverage_file).split('=', 4)
# noinspection PyTypeChecker
if len(parts) != 5 or not parts[4].startswith('coverage.'):
return None
names = dict(
command=parts[0],
target=parts[1],
environment=parts[2],
version=parts[3],
)
export_names = dict(
version=parts[3],
)
group = ''
for part in COVERAGE_GROUPS:
if part in args.group_by:
group += '=%s' % names[part]
elif args.export:
group += '=%s' % export_names.get(part, 'various')
if args.export:
group = group.lstrip('=')
return group