Files
webfortilog/app/services/parser.py
2026-04-24 15:00:43 +02:00

99 lines
2.7 KiB
Python

from collections import OrderedDict
from io import BufferedIOBase, TextIOBase
class LogParseError(ValueError):
"""Raised when the uploaded log file cannot be parsed."""
def _decode_log_content(raw_bytes: bytes | str) -> str:
"""Decode uploaded log content using practical text encodings seen in exports."""
if isinstance(raw_bytes, str):
return raw_bytes
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
try:
return raw_bytes.decode(encoding)
except UnicodeDecodeError:
continue
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
def _tokenize_line(line: str) -> list[str]:
"""Split a line using shell-like rules while tolerating unmatched trailing quotes."""
tokens: list[str] = []
current: list[str] = []
quote_char: str | None = None
escape_next = False
for char in line:
if escape_next:
current.append(char)
escape_next = False
continue
if char == "\\":
escape_next = True
continue
if quote_char is not None:
if char == quote_char:
quote_char = None
else:
current.append(char)
continue
if char in {'"', "'"}:
quote_char = char
continue
if char.isspace():
if current:
tokens.append("".join(current))
current = []
continue
current.append(char)
if escape_next:
current.append("\\")
if current:
tokens.append("".join(current))
return tokens
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
"""Parse a text log file where each line contains shell-like key/value tokens."""
raw_bytes = stream.read()
content = _decode_log_content(raw_bytes)
records: list[dict[str, str]] = []
seen_keys: OrderedDict[str, None] = OrderedDict()
for line_number, raw_line in enumerate(content.splitlines(), start=1):
line = raw_line.strip()
if not line:
continue
tokens = _tokenize_line(line)
record: dict[str, str] = {}
for token in tokens:
if "=" not in token:
raise LogParseError(
f"Line {line_number}: token '{token}' is missing '='."
)
key, value = token.split("=", 1)
if not key:
raise LogParseError(f"Line {line_number}: empty key is not allowed.")
record[key] = value
seen_keys.setdefault(key, None)
records.append(record)
return records, list(seen_keys.keys())