Add coverage filtering to ansible-test. (#68158)

* Relocate expand_indexes so it can be reused.

* Add generate_indexes function.

* Simplify type annotations.

* Add `coverage analyze targets filter` command.

* Add changelog entry.
This commit is contained in:
Matt Clay 2020-03-11 12:02:39 -07:00 committed by Matt Martz
parent 787089cba2
commit 8715bc400a
7 changed files with 208 additions and 32 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- "ansible-test - Added a ``ansible-test coverage analyze targets filter`` command to filter aggregated coverage reports by path and/or target name."

View file

@ -125,6 +125,11 @@ from .coverage.analyze.targets.expand import (
CoverageAnalyzeTargetsExpandConfig, CoverageAnalyzeTargetsExpandConfig,
) )
from .coverage.analyze.targets.filter import (
command_coverage_analyze_targets_filter,
CoverageAnalyzeTargetsFilterConfig,
)
from .coverage.analyze.targets.combine import ( from .coverage.analyze.targets.combine import (
command_coverage_analyze_targets_combine, command_coverage_analyze_targets_combine,
CoverageAnalyzeTargetsCombineConfig, CoverageAnalyzeTargetsCombineConfig,
@ -724,6 +729,51 @@ def add_coverage_analyze(coverage_subparsers, coverage_common): # type: (argpar
help='output file to write expanded coverage to', help='output file to write expanded coverage to',
) )
targets_filter = targets_subparsers.add_parser(
'filter',
parents=[coverage_common],
help='filter aggregated coverage data',
)
targets_filter.set_defaults(
func=command_coverage_analyze_targets_filter,
config=CoverageAnalyzeTargetsFilterConfig,
)
targets_filter.add_argument(
'input_file',
help='input file to read aggregated coverage from',
)
targets_filter.add_argument(
'output_file',
help='output file to write expanded coverage to',
)
targets_filter.add_argument(
'--include-target',
dest='include_targets',
action='append',
help='include the specified targets',
)
targets_filter.add_argument(
'--exclude-target',
dest='exclude_targets',
action='append',
help='exclude the specified targets',
)
targets_filter.add_argument(
'--include-path',
help='include paths matching the given regex',
)
targets_filter.add_argument(
'--exclude-path',
help='exclude paths matching the given regex',
)
targets_combine = targets_subparsers.add_parser( targets_combine = targets_subparsers.add_parser(
'combine', 'combine',
parents=[coverage_common], parents=[coverage_common],

View file

@ -21,6 +21,9 @@ from .. import (
) )
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
TargetKey = t.TypeVar('TargetKey', int, t.Tuple[int, int])
NamedPoints = t.Dict[str, t.Dict[TargetKey, t.Set[str]]]
IndexedPoints = t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
Arcs = t.Dict[str, t.Dict[t.Tuple[int, int], t.Set[int]]] Arcs = t.Dict[str, t.Dict[t.Tuple[int, int], t.Set[int]]]
Lines = t.Dict[str, t.Dict[int, t.Set[int]]] Lines = t.Dict[str, t.Dict[int, t.Set[int]]]
TargetIndexes = t.Dict[str, int] TargetIndexes = t.Dict[str, int]
@ -107,6 +110,42 @@ def get_target_index(name, target_indexes): # type: (str, TargetIndexes) -> int
return target_indexes.setdefault(name, len(target_indexes)) return target_indexes.setdefault(name, len(target_indexes))
def expand_indexes(
source_data, # type: IndexedPoints
source_index, # type: t.List[str]
format_func, # type: t.Callable[t.Tuple[t.Any], str]
): # type: (...) -> NamedPoints
"""Expand indexes from the source into target names for easier processing of the data (arcs or lines)."""
combined_data = {} # type: t.Dict[str, t.Dict[t.Any, t.Set[str]]]
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})
for covered_point, covered_target_indexes in covered_points.items():
combined_point = combined_points.setdefault(format_func(covered_point), set())
for covered_target_index in covered_target_indexes:
combined_point.add(source_index[covered_target_index])
return combined_data
def generate_indexes(target_indexes, data): # type: (TargetIndexes, NamedPoints) -> IndexedPoints
"""Return an indexed version of the given data (arcs or points)."""
results = {} # type: IndexedPoints
for path, points in data.items():
result_points = results[path] = {}
for point, target_names in points.items():
result_point = result_points[point] = set()
for target_name in target_names:
result_point.add(get_target_index(target_name, target_indexes))
return results
class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig): class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
"""Configuration for the `coverage analyze targets` command.""" """Configuration for the `coverage analyze targets` command."""
def __init__(self, args): # type: (t.Any) -> None def __init__(self, args): # type: (t.Any) -> None

View file

@ -15,6 +15,7 @@ from . import (
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
from . import ( from . import (
Arcs, Arcs,
IndexedPoints,
Lines, Lines,
TargetIndexes, TargetIndexes,
) )
@ -38,9 +39,9 @@ def command_coverage_analyze_targets_combine(args): # type: (CoverageAnalyzeTar
def merge_indexes( def merge_indexes(
source_data, # type: t.Dict[str, t.Dict[t.Any, t.Set[int]]] source_data, # type: IndexedPoints
source_index, # type: t.List[str] source_index, # type: t.List[str]
combined_data, # type: t.Dict[str, t.Dict[t.Any, t.Set[int]]] combined_data, # type: IndexedPoints
combined_index, # type: TargetIndexes combined_index, # type: TargetIndexes
): # type: (...) -> None ): # type: (...) -> None
"""Merge indexes from the source into the combined data set (arcs or lines).""" """Merge indexes from the source into the combined data set (arcs or lines)."""

View file

@ -11,6 +11,7 @@ from ....io import (
from . import ( from . import (
CoverageAnalyzeTargetsConfig, CoverageAnalyzeTargetsConfig,
expand_indexes,
format_arc, format_arc,
read_report, read_report,
) )
@ -29,26 +30,6 @@ def command_coverage_analyze_targets_expand(args): # type: (CoverageAnalyzeTarg
write_json_file(args.output_file, report, encoder=SortedSetEncoder) write_json_file(args.output_file, report, encoder=SortedSetEncoder)
def expand_indexes(
source_data, # type: t.Dict[str, t.Dict[t.Any, t.Set[int]]]
source_index, # type: t.List[str]
format_func, # type: t.Callable[t.Tuple[t.Any], str]
): # type: (...) -> t.Dict[str, t.Dict[t.Any, t.Set[str]]]
"""Merge indexes from the source into the combined data set (arcs or lines)."""
combined_data = {} # type: t.Dict[str, t.Dict[t.Any, t.Set[str]]]
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})
for covered_point, covered_target_indexes in covered_points.items():
combined_point = combined_points.setdefault(format_func(covered_point), set())
for covered_target_index in covered_target_indexes:
combined_point.add(source_index[covered_target_index])
return combined_data
class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig): class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets expand` command.""" """Configuration for the `coverage analyze targets expand` command."""
def __init__(self, args): # type: (t.Any) -> None def __init__(self, args): # type: (t.Any) -> None

View file

@ -0,0 +1,104 @@
"""Filter an aggregated coverage file, keeping only the specified targets."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import re
from .... import types as t
from . import (
CoverageAnalyzeTargetsConfig,
expand_indexes,
generate_indexes,
make_report,
read_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
NamedPoints,
TargetIndexes,
)
def command_coverage_analyze_targets_filter(args): # type: (CoverageAnalyzeTargetsFilterConfig) -> None
"""Filter target names in an aggregated coverage file."""
covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, lambda v: v)
filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, lambda v: v)
include_targets = set(args.include_targets) if args.include_targets else None
exclude_targets = set(args.exclude_targets) if args.exclude_targets else None
include_path = re.compile(args.include_path) if args.include_path else None
exclude_path = re.compile(args.exclude_path) if args.exclude_path else None
def path_filter_func(path):
if include_path and not re.search(include_path, path):
return False
if exclude_path and re.search(exclude_path, path):
return False
return True
def target_filter_func(targets):
if include_targets:
targets &= include_targets
if exclude_targets:
targets -= exclude_targets
return targets
filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func)
filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func)
target_indexes = {} # type: TargetIndexes
indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs)
indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines)
report = make_report(target_indexes, indexed_path_arcs, indexed_path_lines)
write_report(args, report, args.output_file)
def filter_data(
data, # type: NamedPoints
path_filter_func, # type: t.Callable[[str], bool]
target_filter_func, # type: t.Callable[[t.Set[str]], t.Set[str]]
): # type: (...) -> NamedPoints
"""Filter the data set using the specified filter function."""
result = {} # type: NamedPoints
for src_path, src_points in data.items():
if not path_filter_func(src_path):
continue
dst_points = {}
for src_point, src_targets in src_points.items():
dst_targets = target_filter_func(src_targets)
if dst_targets:
dst_points[src_point] = dst_targets
if dst_points:
result[src_path] = dst_points
return result
class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets filter` command."""
def __init__(self, args): # type: (t.Any) -> None
super(CoverageAnalyzeTargetsFilterConfig, self).__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
self.include_targets = args.include_targets # type: t.List[str]
self.exclude_targets = args.exclude_targets # type: t.List[str]
self.include_path = args.include_path # type: t.Optional[str]
self.exclude_path = args.exclude_path # type: t.Optional[str]

View file

@ -21,10 +21,9 @@ from . import (
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
from . import ( from . import (
TargetIndexes, TargetIndexes,
IndexedPoints,
) )
TargetKey = t.TypeVar('TargetKey', int, t.Tuple[int, int])
def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTargetsMissingConfig) -> None def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTargetsMissingConfig) -> None
"""Identify aggregated coverage in one file missing from another.""" """Identify aggregated coverage in one file missing from another."""
@ -44,12 +43,12 @@ def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTar
def find_gaps( def find_gaps(
from_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]] from_data, # type: IndexedPoints
from_index, # type: t.List[str] from_index, # type: t.List[str]
to_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]] to_data, # type: IndexedPoints
target_indexes, # type: TargetIndexes, target_indexes, # type: TargetIndexes
only_exists, # type: bool only_exists, # type: bool
): # type: (...) -> t.Dict[str, t.Dict[TargetKey, t.Set[int]]] ): # type: (...) -> IndexedPoints
"""Find gaps in coverage between the from and to data sets.""" """Find gaps in coverage between the from and to data sets."""
target_data = {} target_data = {}
@ -69,13 +68,13 @@ def find_gaps(
def find_missing( def find_missing(
from_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]] from_data, # type: IndexedPoints
from_index, # type: t.List[str] from_index, # type: t.List[str]
to_data, # type: t.Dict[str, t.Dict[TargetKey, t.Set[int]]] to_data, # type: IndexedPoints
to_index, # type: t.List[str] to_index, # type: t.List[str]
target_indexes, # type: TargetIndexes, target_indexes, # type: TargetIndexes
only_exists, # type: bool only_exists, # type: bool
): # type: (...) -> t.Dict[str, t.Dict[TargetKey, t.Set[int]]] ): # type: (...) -> IndexedPoints
"""Find coverage in from_data not present in to_data (arcs or lines).""" """Find coverage in from_data not present in to_data (arcs or lines)."""
target_data = {} target_data = {}