From 235aa47dd37cf80c9f2d19280b11f1722b80c27c Mon Sep 17 00:00:00 2001 From: Alfredo Di Stasio Date: Fri, 24 Apr 2026 15:12:51 +0200 Subject: [PATCH] Harden parser for malformed multiline records --- app/services/parser.py | 114 ++++++++++++++++++++++------------------- tests/test_parser.py | 16 ++++++ 2 files changed, 78 insertions(+), 52 deletions(-) diff --git a/app/services/parser.py b/app/services/parser.py index 4f362cc..c3e4c14 100644 --- a/app/services/parser.py +++ b/app/services/parser.py @@ -1,5 +1,9 @@ from collections import OrderedDict from io import BufferedIOBase, TextIOBase +import re + +RECORD_PREFIX = "v015xxxxdate=" +KEY_PATTERN = re.compile(r"(?:(?<=\s)|^)([A-Za-z_][A-Za-z0-9_]*)=") class LogParseError(ValueError): @@ -20,48 +24,70 @@ def _decode_log_content(raw_bytes: bytes | str) -> str: raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.") -def _tokenize_line(line: str) -> list[str]: - """Split a line using shell-like rules while tolerating unmatched trailing quotes.""" - tokens: list[str] = [] - current: list[str] = [] - quote_char: str | None = None - escape_next = False +def _normalize_value(value: str) -> str: + """Remove balanced shell-style quotes while tolerating malformed values.""" + value = value.strip() + if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: + return value[1:-1] + if value[:1] in {'"', "'"}: + return value[1:] + return value - for char in line: - if escape_next: - current.append(char) - escape_next = False + +def _parse_record(line: str, line_number: int) -> dict[str, str]: + """Parse a logical record by locating `key=` boundaries instead of splitting on spaces.""" + matches = list(KEY_PATTERN.finditer(line)) + if not matches: + raise LogParseError(f"Line {line_number}: no key=value pairs were found.") + + record: dict[str, str] = {} + for index, match in enumerate(matches): + key = match.group(1) + value_start = match.end() + value_end = matches[index + 1].start() if index + 1 < len(matches) else len(line) + raw_value = line[value_start:value_end].strip() + if raw_value and raw_value[:1] not in {'"', "'"} and any( + char.isspace() for char in raw_value + ): + raise LogParseError( + f"Line {line_number}: invalid unquoted value for key '{key}'." + ) + value = _normalize_value(raw_value) + record[key] = value + + return record + + +def _iter_logical_records(content: str) -> list[tuple[int, str]]: + """Rebuild logical records when embedded newlines split a single log entry.""" + records: list[tuple[int, str]] = [] + current_record: list[str] = [] + current_start_line: int | None = None + + for line_number, raw_line in enumerate(content.splitlines(), start=1): + line = raw_line.strip() + if not line: continue - if char == "\\": - escape_next = True + if line.startswith(RECORD_PREFIX): + if current_record and current_start_line is not None: + records.append((current_start_line, "".join(current_record))) + current_record = [line] + current_start_line = line_number continue - if quote_char is not None: - if char == quote_char: - quote_char = None - else: - current.append(char) + if current_record: + current_record.append(line) continue - if char in {'"', "'"}: - quote_char = char - continue + raise LogParseError( + f"Line {line_number}: unexpected content before the first log record." + ) - if char.isspace(): - if current: - tokens.append("".join(current)) - current = [] - continue + if current_record and current_start_line is not None: + records.append((current_start_line, "".join(current_record))) - current.append(char) - - if escape_next: - current.append("\\") - if current: - tokens.append("".join(current)) - - return tokens + return records def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]: @@ -72,25 +98,9 @@ def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, records: list[dict[str, str]] = [] seen_keys: OrderedDict[str, None] = OrderedDict() - for line_number, raw_line in enumerate(content.splitlines(), start=1): - line = raw_line.strip() - if not line: - continue - - tokens = _tokenize_line(line) - - record: dict[str, str] = {} - for token in tokens: - if "=" not in token: - raise LogParseError( - f"Line {line_number}: token '{token}' is missing '='." - ) - - key, value = token.split("=", 1) - if not key: - raise LogParseError(f"Line {line_number}: empty key is not allowed.") - - record[key] = value + for line_number, line in _iter_logical_records(content): + record = _parse_record(line, line_number) + for key in record: seen_keys.setdefault(key, None) records.append(record) diff --git a/tests/test_parser.py b/tests/test_parser.py index dda028d..940861b 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -58,3 +58,19 @@ def test_parse_log_file_tolerates_unterminated_quotes(): records, _union_keys = parse_log_file(stream) assert records[0]["msg"] == "broken quoted value" + + +def test_parse_log_file_rebuilds_record_after_embedded_newlines(): + stream = io.BytesIO( + b'v015xxxxdate=2024-02-15 time=09:10:11 msg="hello\n' + b'broken-fragment\n' + b'world" action=Alert\n' + b'v015xxxxdate=2024-02-15 time=09:10:12 msg="next" action=Monitor\n' + ) + + records, _union_keys = parse_log_file(stream) + + assert len(records) == 2 + assert records[0]["msg"] == "hellobroken-fragmentworld" + assert records[0]["action"] == "Alert" + assert records[1]["msg"] == "next"