Harden parser for malformed multiline records
This commit is contained in:
@@ -1,5 +1,9 @@
|
|||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from io import BufferedIOBase, TextIOBase
|
from io import BufferedIOBase, TextIOBase
|
||||||
|
import re
|
||||||
|
|
||||||
|
RECORD_PREFIX = "v015xxxxdate="
|
||||||
|
KEY_PATTERN = re.compile(r"(?:(?<=\s)|^)([A-Za-z_][A-Za-z0-9_]*)=")
|
||||||
|
|
||||||
|
|
||||||
class LogParseError(ValueError):
|
class LogParseError(ValueError):
|
||||||
@@ -20,48 +24,70 @@ def _decode_log_content(raw_bytes: bytes | str) -> str:
|
|||||||
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
|
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
|
||||||
|
|
||||||
|
|
||||||
def _tokenize_line(line: str) -> list[str]:
|
def _normalize_value(value: str) -> str:
|
||||||
"""Split a line using shell-like rules while tolerating unmatched trailing quotes."""
|
"""Remove balanced shell-style quotes while tolerating malformed values."""
|
||||||
tokens: list[str] = []
|
value = value.strip()
|
||||||
current: list[str] = []
|
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
|
||||||
quote_char: str | None = None
|
return value[1:-1]
|
||||||
escape_next = False
|
if value[:1] in {'"', "'"}:
|
||||||
|
return value[1:]
|
||||||
|
return value
|
||||||
|
|
||||||
for char in line:
|
|
||||||
if escape_next:
|
def _parse_record(line: str, line_number: int) -> dict[str, str]:
|
||||||
current.append(char)
|
"""Parse a logical record by locating `key=` boundaries instead of splitting on spaces."""
|
||||||
escape_next = False
|
matches = list(KEY_PATTERN.finditer(line))
|
||||||
|
if not matches:
|
||||||
|
raise LogParseError(f"Line {line_number}: no key=value pairs were found.")
|
||||||
|
|
||||||
|
record: dict[str, str] = {}
|
||||||
|
for index, match in enumerate(matches):
|
||||||
|
key = match.group(1)
|
||||||
|
value_start = match.end()
|
||||||
|
value_end = matches[index + 1].start() if index + 1 < len(matches) else len(line)
|
||||||
|
raw_value = line[value_start:value_end].strip()
|
||||||
|
if raw_value and raw_value[:1] not in {'"', "'"} and any(
|
||||||
|
char.isspace() for char in raw_value
|
||||||
|
):
|
||||||
|
raise LogParseError(
|
||||||
|
f"Line {line_number}: invalid unquoted value for key '{key}'."
|
||||||
|
)
|
||||||
|
value = _normalize_value(raw_value)
|
||||||
|
record[key] = value
|
||||||
|
|
||||||
|
return record
|
||||||
|
|
||||||
|
|
||||||
|
def _iter_logical_records(content: str) -> list[tuple[int, str]]:
|
||||||
|
"""Rebuild logical records when embedded newlines split a single log entry."""
|
||||||
|
records: list[tuple[int, str]] = []
|
||||||
|
current_record: list[str] = []
|
||||||
|
current_start_line: int | None = None
|
||||||
|
|
||||||
|
for line_number, raw_line in enumerate(content.splitlines(), start=1):
|
||||||
|
line = raw_line.strip()
|
||||||
|
if not line:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if char == "\\":
|
if line.startswith(RECORD_PREFIX):
|
||||||
escape_next = True
|
if current_record and current_start_line is not None:
|
||||||
|
records.append((current_start_line, "".join(current_record)))
|
||||||
|
current_record = [line]
|
||||||
|
current_start_line = line_number
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if quote_char is not None:
|
if current_record:
|
||||||
if char == quote_char:
|
current_record.append(line)
|
||||||
quote_char = None
|
|
||||||
else:
|
|
||||||
current.append(char)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if char in {'"', "'"}:
|
raise LogParseError(
|
||||||
quote_char = char
|
f"Line {line_number}: unexpected content before the first log record."
|
||||||
continue
|
)
|
||||||
|
|
||||||
if char.isspace():
|
if current_record and current_start_line is not None:
|
||||||
if current:
|
records.append((current_start_line, "".join(current_record)))
|
||||||
tokens.append("".join(current))
|
|
||||||
current = []
|
|
||||||
continue
|
|
||||||
|
|
||||||
current.append(char)
|
return records
|
||||||
|
|
||||||
if escape_next:
|
|
||||||
current.append("\\")
|
|
||||||
if current:
|
|
||||||
tokens.append("".join(current))
|
|
||||||
|
|
||||||
return tokens
|
|
||||||
|
|
||||||
|
|
||||||
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
|
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
|
||||||
@@ -72,25 +98,9 @@ def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str,
|
|||||||
records: list[dict[str, str]] = []
|
records: list[dict[str, str]] = []
|
||||||
seen_keys: OrderedDict[str, None] = OrderedDict()
|
seen_keys: OrderedDict[str, None] = OrderedDict()
|
||||||
|
|
||||||
for line_number, raw_line in enumerate(content.splitlines(), start=1):
|
for line_number, line in _iter_logical_records(content):
|
||||||
line = raw_line.strip()
|
record = _parse_record(line, line_number)
|
||||||
if not line:
|
for key in record:
|
||||||
continue
|
|
||||||
|
|
||||||
tokens = _tokenize_line(line)
|
|
||||||
|
|
||||||
record: dict[str, str] = {}
|
|
||||||
for token in tokens:
|
|
||||||
if "=" not in token:
|
|
||||||
raise LogParseError(
|
|
||||||
f"Line {line_number}: token '{token}' is missing '='."
|
|
||||||
)
|
|
||||||
|
|
||||||
key, value = token.split("=", 1)
|
|
||||||
if not key:
|
|
||||||
raise LogParseError(f"Line {line_number}: empty key is not allowed.")
|
|
||||||
|
|
||||||
record[key] = value
|
|
||||||
seen_keys.setdefault(key, None)
|
seen_keys.setdefault(key, None)
|
||||||
|
|
||||||
records.append(record)
|
records.append(record)
|
||||||
|
|||||||
@@ -58,3 +58,19 @@ def test_parse_log_file_tolerates_unterminated_quotes():
|
|||||||
records, _union_keys = parse_log_file(stream)
|
records, _union_keys = parse_log_file(stream)
|
||||||
|
|
||||||
assert records[0]["msg"] == "broken quoted value"
|
assert records[0]["msg"] == "broken quoted value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_log_file_rebuilds_record_after_embedded_newlines():
|
||||||
|
stream = io.BytesIO(
|
||||||
|
b'v015xxxxdate=2024-02-15 time=09:10:11 msg="hello\n'
|
||||||
|
b'broken-fragment\n'
|
||||||
|
b'world" action=Alert\n'
|
||||||
|
b'v015xxxxdate=2024-02-15 time=09:10:12 msg="next" action=Monitor\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _union_keys = parse_log_file(stream)
|
||||||
|
|
||||||
|
assert len(records) == 2
|
||||||
|
assert records[0]["msg"] == "hellobroken-fragmentworld"
|
||||||
|
assert records[0]["action"] == "Alert"
|
||||||
|
assert records[1]["msg"] == "next"
|
||||||
|
|||||||
Reference in New Issue
Block a user