Reduce conversion memory footprint

This commit is contained in:
Alfredo Di Stasio
2026-04-27 11:44:40 +02:00
parent 9313b54abb
commit f9f792f6a1
10 changed files with 324 additions and 102 deletions

View File

@@ -0,0 +1,47 @@
from dataclasses import dataclass
from pathlib import Path
from app.services.exporter import ExportResult
from app.services.parser import create_parse_session
from app.services.processing import ProcessingOptions, filter_records, sort_records
from app.services.storage import ResultMetadata, persist_result
@dataclass(slots=True)
class ConversionResult:
metadata: ResultMetadata
export_result: ExportResult
parsed_count: int
filtered_count: int
def convert_uploaded_log(
stream,
options: ProcessingOptions,
output_dir: Path,
output_format: str,
preview_record_limit: int,
) -> ConversionResult:
"""Convert an uploaded log into a persisted export with a small in-memory preview.
Parsing, filtering, and export writing are streamed to keep memory usage low.
Sorting still materializes the filtered records because global ordering by datetime
or severity requires seeing the whole filtered result set first.
"""
parse_session = create_parse_session(stream)
sorted_records = sort_records(filter_records(parse_session.iter_records(), options), options)
metadata, export_result = persist_result(
output_dir=output_dir,
records=sorted_records,
union_keys=parse_session.union_keys(),
mode=options.mode,
output_format=output_format,
preview_record_limit=preview_record_limit,
)
return ConversionResult(
metadata=metadata,
export_result=export_result,
parsed_count=parse_session.parsed_count,
filtered_count=len(sorted_records),
)

View File

@@ -1,69 +1,107 @@
import csv
import io
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence, TextIO
from app.constants import VENDOR_FIELDS
@dataclass(slots=True)
class ExportResult:
content: str
columns: list[str]
output_format: str
preview_text: str
def preview(self, record_limit: int) -> str:
"""Build a small preview string for the result page."""
if self.output_format == "text":
marker = f"--- record {record_limit + 1} ---"
if marker in self.content:
return self.content.split(marker, 1)[0].rstrip()
return self.content
lines = self.content.splitlines()
if len(lines) <= record_limit + 1:
return self.content
return "\n".join(lines[: record_limit + 1])
def preview(self, _record_limit: int) -> str:
"""Return the preview that was collected during export writing."""
return self.preview_text
def build_export(
records: list[dict[str, str]],
def write_export(
file_path: Path,
records: Sequence[dict[str, str]],
union_keys: list[str],
mode: str,
output_format: str,
preview_record_limit: int,
) -> ExportResult:
"""Write the final export directly to disk and keep only a small preview in memory."""
columns = VENDOR_FIELDS if mode == "vendor" else union_keys
if output_format == "text":
return ExportResult(
content=_render_text(records, columns),
columns=columns,
output_format=output_format,
)
with file_path.open("w", encoding="utf-8", newline="") as export_file:
if output_format == "text":
preview_text = _write_text(
export_file=export_file,
records=records,
columns=columns,
preview_record_limit=preview_record_limit,
)
else:
preview_text = _write_csv(
export_file=export_file,
records=records,
columns=columns,
preview_record_limit=preview_record_limit,
)
return ExportResult(
content=_render_csv(records, columns),
columns=columns,
output_format=output_format,
preview_text=preview_text,
)
def _render_text(records: list[dict[str, str]], columns: list[str]) -> str:
def _write_text(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
max_key_length = max((len(column) for column in columns), default=0)
chunks: list[str] = []
preview_lines: list[str] = []
wrote_line = False
for index, record in enumerate(records, start=1):
chunks.append(f"--- record {index} ---")
header = f"--- record {index} ---"
wrote_line = _write_line(export_file, header, wrote_line)
if index <= preview_record_limit:
preview_lines.append(header)
for column in columns:
value = record.get(column, "")
chunks.append(f" {column.ljust(max_key_length)} = {value}")
line = f" {column.ljust(max_key_length)} = {record.get(column, '')}"
wrote_line = _write_line(export_file, line, wrote_line)
if index <= preview_record_limit:
preview_lines.append(line)
return "\n".join(chunks)
return "\n".join(preview_lines)
def _render_csv(records: list[dict[str, str]], columns: list[str]) -> str:
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=columns, extrasaction="ignore")
def _write_csv(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore")
writer.writeheader()
for record in records:
writer.writerow({column: record.get(column, "") for column in columns})
return buffer.getvalue()
preview_buffer = io.StringIO()
preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore")
preview_writer.writeheader()
for index, record in enumerate(records, start=1):
row = {column: record.get(column, "") for column in columns}
writer.writerow(row)
if index <= preview_record_limit:
preview_writer.writerow(row)
return preview_buffer.getvalue().rstrip("\n")
def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool:
"""Write lines without leaving a trailing newline at the end of the file."""
if wrote_line:
export_file.write("\n")
export_file.write(line)
return True

View File

@@ -1,4 +1,6 @@
import codecs
from collections import OrderedDict
from dataclasses import dataclass, field
from io import BufferedIOBase, TextIOBase
import re
@@ -10,18 +12,35 @@ class LogParseError(ValueError):
"""Raised when the uploaded log file cannot be parsed."""
def _decode_log_content(raw_bytes: bytes | str) -> str:
"""Decode uploaded log content using practical text encodings seen in exports."""
if isinstance(raw_bytes, str):
return raw_bytes
@dataclass(slots=True)
class ParseSession:
"""Stateful streamed parser for uploaded log files."""
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
try:
return raw_bytes.decode(encoding)
except UnicodeDecodeError:
continue
stream: BufferedIOBase | TextIOBase
encoding: str | None
_union_keys: OrderedDict[str, None] = field(default_factory=OrderedDict)
parsed_count: int = 0
_consumed: bool = False
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
def iter_records(self):
if self._consumed:
raise RuntimeError("ParseSession records can only be consumed once.")
self._consumed = True
for line_number, line in _iter_logical_records(_iter_physical_lines(self.stream, self.encoding)):
record = _parse_record(line, line_number)
for key in record:
self._union_keys.setdefault(key, None)
self.parsed_count += 1
yield record
def union_keys(self) -> list[str]:
return list(self._union_keys.keys())
def create_parse_session(stream: BufferedIOBase | TextIOBase) -> ParseSession:
"""Prepare a streamed parser session without materializing the full upload in memory."""
return ParseSession(stream=stream, encoding=_resolve_stream_encoding(stream))
def _normalize_value(value: str) -> str:
@@ -34,6 +53,80 @@ def _normalize_value(value: str) -> str:
return value
def _resolve_stream_encoding(stream: BufferedIOBase | TextIOBase) -> str | None:
"""Detect the most suitable stream encoding without reading the full file into memory."""
probe = stream.read(0)
if isinstance(probe, str):
return None
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
try:
_validate_stream_encoding(stream, encoding)
return encoding
except UnicodeDecodeError:
continue
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
def _validate_stream_encoding(stream: BufferedIOBase | TextIOBase, encoding: str) -> None:
"""Scan the stream to verify that the candidate encoding can decode it fully."""
_rewind_stream(stream)
decoder = codecs.getincrementaldecoder(encoding)()
for chunk in iter(lambda: stream.read(64 * 1024), b""):
decoder.decode(chunk, final=False)
decoder.decode(b"", final=True)
_rewind_stream(stream)
def _iter_physical_lines(
stream: BufferedIOBase | TextIOBase,
encoding: str | None,
):
"""Yield decoded physical lines from the uploaded stream without full-file buffering."""
_rewind_stream(stream)
if encoding is None:
for line_number, raw_line in enumerate(stream, start=1):
yield line_number, raw_line
return
line_number = 1
decoder = codecs.getincrementaldecoder(encoding)()
pending = ""
for chunk in iter(lambda: stream.read(64 * 1024), b""):
text = decoder.decode(chunk, final=False)
pending += text
while True:
newline_index = pending.find("\n")
if newline_index == -1:
break
line = pending[: newline_index + 1]
pending = pending[newline_index + 1 :]
yield line_number, line
line_number += 1
pending += decoder.decode(b"", final=True)
while True:
newline_index = pending.find("\n")
if newline_index == -1:
break
line = pending[: newline_index + 1]
pending = pending[newline_index + 1 :]
yield line_number, line
line_number += 1
if pending:
yield line_number, pending
def _rewind_stream(stream: BufferedIOBase | TextIOBase) -> None:
"""Move the uploaded stream back to the start."""
if not hasattr(stream, "seek"):
raise LogParseError("The uploaded file stream is not seekable.")
stream.seek(0)
def _parse_record(line: str, line_number: int) -> dict[str, str]:
"""Parse a logical record by locating `key=` boundaries instead of splitting on spaces."""
matches = list(KEY_PATTERN.finditer(line))
@@ -58,20 +151,19 @@ def _parse_record(line: str, line_number: int) -> dict[str, str]:
return record
def _iter_logical_records(content: str) -> list[tuple[int, str]]:
def _iter_logical_records(physical_lines):
"""Rebuild logical records when embedded newlines split a single log entry."""
records: list[tuple[int, str]] = []
current_record: list[str] = []
current_start_line: int | None = None
for line_number, raw_line in enumerate(content.splitlines(), start=1):
for line_number, raw_line in physical_lines:
line = raw_line.strip()
if not line:
continue
if line.startswith(RECORD_PREFIX):
if current_record and current_start_line is not None:
records.append((current_start_line, "".join(current_record)))
yield current_start_line, "".join(current_record)
current_record = [line]
current_start_line = line_number
continue
@@ -85,24 +177,11 @@ def _iter_logical_records(content: str) -> list[tuple[int, str]]:
)
if current_record and current_start_line is not None:
records.append((current_start_line, "".join(current_record)))
return records
yield current_start_line, "".join(current_record)
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
"""Parse a text log file where each line contains shell-like key/value tokens."""
raw_bytes = stream.read()
content = _decode_log_content(raw_bytes)
records: list[dict[str, str]] = []
seen_keys: OrderedDict[str, None] = OrderedDict()
for line_number, line in _iter_logical_records(content):
record = _parse_record(line, line_number)
for key in record:
seen_keys.setdefault(key, None)
records.append(record)
return records, list(seen_keys.keys())
"""Compatibility helper that still materializes all parsed records when needed."""
session = create_parse_session(stream)
records = list(session.iter_records())
return records, session.union_keys()

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable
from app.constants import SEVERITY_RANKING
@@ -20,11 +21,9 @@ class ProcessingOptions:
def filter_records(
records: list[dict[str, str]], options: ProcessingOptions
) -> list[dict[str, str]]:
"""Apply user-selected filters to parsed records."""
filtered: list[dict[str, str]] = []
records: Iterable[dict[str, str]], options: ProcessingOptions
) -> Iterable[dict[str, str]]:
"""Apply user-selected filters lazily to parsed records."""
for record in records:
policy_value = record.get("policy", "")
severity_value = record.get("severity_level", "")
@@ -38,13 +37,11 @@ def filter_records(
if options.severity_ci and options.severity_ci.lower() not in severity_value.lower():
continue
filtered.append(record)
return filtered
yield record
def sort_records(
records: list[dict[str, str]], options: ProcessingOptions
records: Iterable[dict[str, str]], options: ProcessingOptions
) -> list[dict[str, str]]:
"""Sort records by datetime or severity using the requested order."""
reverse = options.order == "desc"

View File

@@ -3,7 +3,7 @@ import uuid
from dataclasses import asdict, dataclass
from pathlib import Path
from app.services.exporter import ExportResult
from app.services.exporter import ExportResult, write_export
@dataclass(slots=True)
@@ -14,16 +14,30 @@ class ResultMetadata:
mimetype: str
def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetadata:
def persist_result(
output_dir: Path,
records: list[dict[str, str]],
union_keys: list[str],
mode: str,
output_format: str,
preview_record_limit: int,
) -> tuple[ResultMetadata, ExportResult]:
"""Persist generated output and sidecar metadata in a temporary directory."""
result_id = uuid.uuid4().hex
extension = "txt" if export_result.output_format == "text" else "csv"
extension = "txt" if output_format == "text" else "csv"
mimetype = "text/plain; charset=utf-8" if extension == "txt" else "text/csv; charset=utf-8"
file_path = output_dir / f"{result_id}.{extension}"
metadata_path = output_dir / f"{result_id}.json"
file_path.write_text(export_result.content, encoding="utf-8")
export_result = write_export(
file_path=file_path,
records=records,
union_keys=union_keys,
mode=mode,
output_format=output_format,
preview_record_limit=preview_record_limit,
)
metadata = ResultMetadata(
result_id=result_id,
file_path=str(file_path),
@@ -31,7 +45,7 @@ def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetad
mimetype=mimetype,
)
metadata_path.write_text(json.dumps(asdict(metadata)), encoding="utf-8")
return metadata
return metadata, export_result
def load_result_metadata(output_dir: Path, result_id: str) -> dict[str, str] | None: