ansible/test/lib/ansible_test/_internal/diff.py

257 lines
7.2 KiB
Python
Raw Normal View History

"""Diff parsing functions and classes."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import re
import textwrap
import traceback
from . import types as t
from .util import (
ApplicationError,
)
def parse_diff(lines):
"""
:type lines: list[str]
:rtype: list[FileDiff]
"""
return DiffParser(lines).files
class FileDiff:
"""Parsed diff for a single file."""
def __init__(self, old_path, new_path):
"""
:type old_path: str
:type new_path: str
"""
self.old = DiffSide(old_path, new=False)
self.new = DiffSide(new_path, new=True)
self.headers = [] # type: t.List[str]
self.binary = False
def append_header(self, line):
"""
:type line: str
"""
self.headers.append(line)
@property
def is_complete(self):
"""
:rtype: bool
"""
return self.old.is_complete and self.new.is_complete
class DiffSide:
"""Parsed diff for a single 'side' of a single file."""
def __init__(self, path, new):
"""
:type path: str
:type new: bool
"""
self.path = path
self.new = new
self.prefix = '+' if self.new else '-'
self.eof_newline = True
self.exists = True
self.lines = [] # type: t.List[t.Tuple[int, str]]
self.lines_and_context = [] # type: t.List[t.Tuple[int, str]]
self.ranges = [] # type: t.List[t.Tuple[int, int]]
self._next_line_number = 0
self._lines_remaining = 0
self._range_start = 0
def set_start(self, line_start, line_count):
"""
:type line_start: int
:type line_count: int
"""
self._next_line_number = line_start
self._lines_remaining = line_count
self._range_start = 0
def append(self, line):
"""
:type line: str
"""
if self._lines_remaining <= 0:
raise Exception('Diff range overflow.')
entry = self._next_line_number, line
if line.startswith(' '):
pass
elif line.startswith(self.prefix):
self.lines.append(entry)
if not self._range_start:
self._range_start = self._next_line_number
else:
raise Exception('Unexpected diff content prefix.')
self.lines_and_context.append(entry)
self._lines_remaining -= 1
if self._range_start:
if self.is_complete:
range_end = self._next_line_number
elif line.startswith(' '):
range_end = self._next_line_number - 1
else:
range_end = 0
if range_end:
self.ranges.append((self._range_start, range_end))
self._range_start = 0
self._next_line_number += 1
@property
def is_complete(self):
"""
:rtype: bool
"""
return self._lines_remaining == 0
def format_lines(self, context=True):
"""
:type context: bool
:rtype: list[str]
"""
if context:
lines = self.lines_and_context
else:
lines = self.lines
return ['%s:%4d %s' % (self.path, line[0], line[1]) for line in lines]
class DiffParser:
"""Parse diff lines."""
def __init__(self, lines):
"""
:type lines: list[str]
"""
self.lines = lines
self.files = [] # type: t.List[FileDiff]
self.action = self.process_start
self.line_number = 0
self.previous_line = None # type: t.Optional[str]
self.line = None # type: t.Optional[str]
self.file = None # type: t.Optional[FileDiff]
for self.line in self.lines:
self.line_number += 1
try:
self.action()
except Exception as ex:
message = textwrap.dedent('''
%s
Line: %d
Previous: %s
Current: %s
%s
''').strip() % (
ex,
self.line_number,
self.previous_line or '',
self.line or '',
traceback.format_exc(),
)
raise ApplicationError(message.strip())
self.previous_line = self.line
self.complete_file()
def process_start(self):
"""Process a diff start line."""
self.complete_file()
match = re.search(r'^diff --git "?(?:a/)?(?P<old_path>.*)"? "?(?:b/)?(?P<new_path>.*)"?$', self.line)
if not match:
raise Exception('Unexpected diff start line.')
self.file = FileDiff(match.group('old_path'), match.group('new_path'))
self.action = self.process_continue
def process_range(self):
"""Process a diff range line."""
match = re.search(r'^@@ -((?P<old_start>[0-9]+),)?(?P<old_count>[0-9]+) \+((?P<new_start>[0-9]+),)?(?P<new_count>[0-9]+) @@', self.line)
if not match:
raise Exception('Unexpected diff range line.')
self.file.old.set_start(int(match.group('old_start') or 1), int(match.group('old_count')))
self.file.new.set_start(int(match.group('new_start') or 1), int(match.group('new_count')))
self.action = self.process_content
def process_continue(self):
"""Process a diff start, range or header line."""
if self.line.startswith('diff '):
self.process_start()
elif self.line.startswith('@@ '):
self.process_range()
else:
self.process_header()
def process_header(self):
"""Process a diff header line."""
if self.line.startswith('Binary files '):
self.file.binary = True
elif self.line == '--- /dev/null':
self.file.old.exists = False
elif self.line == '+++ /dev/null':
self.file.new.exists = False
else:
self.file.append_header(self.line)
def process_content(self):
"""Process a diff content line."""
if self.line == r'\ No newline at end of file':
if self.previous_line.startswith(' '):
self.file.old.eof_newline = False
self.file.new.eof_newline = False
elif self.previous_line.startswith('-'):
self.file.old.eof_newline = False
elif self.previous_line.startswith('+'):
self.file.new.eof_newline = False
else:
raise Exception('Unexpected previous diff content line.')
return
if self.file.is_complete:
self.process_continue()
return
if self.line.startswith(' '):
self.file.old.append(self.line)
self.file.new.append(self.line)
elif self.line.startswith('-'):
self.file.old.append(self.line)
elif self.line.startswith('+'):
self.file.new.append(self.line)
else:
raise Exception('Unexpected diff content line.')
def complete_file(self):
"""Complete processing of the current file, if any."""
if not self.file:
return
self.files.append(self.file)