diff --git a/README.md b/README.md index 17006cc..e56b9e4 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,8 @@ curl -X POST http://127.0.0.1:5000/convert \ - Temporary output files are written to `instance/outputs` - The application does not require a database - Gunicorn is used as the production WSGI server +- Parsing and export writing are streamed to reduce memory usage on large uploads +- Sorting still materializes the filtered record set because global ordering by datetime or severity requires the full filtered input - Default upload limit is 100 MiB - Set `MAX_UPLOAD_SIZE_MB` to configure the upload limit in megabytes - `MAX_CONTENT_LENGTH` is also supported as a lower-level byte-based override diff --git a/app/routes.py b/app/routes.py index 0aff113..a79b245 100644 --- a/app/routes.py +++ b/app/routes.py @@ -14,15 +14,10 @@ from flask import ( from werkzeug.datastructures import FileStorage from app.constants import MODES, OUTPUT_FORMATS, SORTABLE_FIELDS, SORT_ORDERS -from app.services.exporter import build_export -from app.services.parser import LogParseError, parse_log_file -from app.services.processing import ( - ProcessingError, - ProcessingOptions, - filter_records, - sort_records, -) -from app.services.storage import load_result_metadata, persist_result +from app.services.conversion import convert_uploaded_log +from app.services.parser import LogParseError +from app.services.processing import ProcessingError, ProcessingOptions +from app.services.storage import load_result_metadata main_blueprint = Blueprint("main", __name__) @@ -95,7 +90,6 @@ def convert(): assert uploaded_file is not None try: - records, union_keys = parse_log_file(uploaded_file.stream) options = ProcessingOptions( policy_cs=form.policy_cs, policy_ci=form.policy_ci, @@ -105,12 +99,12 @@ def convert(): order=form.order, mode=form.mode, ) - filtered_records = filter_records(records, options) - sorted_records = sort_records(filtered_records, options) - export_result = build_export(sorted_records, union_keys, form.mode, form.output_format) - metadata = persist_result( + conversion_result = convert_uploaded_log( + stream=uploaded_file.stream, + options=options, output_dir=current_app.config["OUTPUT_DIRECTORY"], - export_result=export_result, + output_format=form.output_format, + preview_record_limit=current_app.config["PREVIEW_RECORD_LIMIT"], ) except (LogParseError, ProcessingError) as exc: flash(str(exc), "danger") @@ -122,15 +116,16 @@ def convert(): ) return render_template("index.html", form=form), 400 - preview_limit = current_app.config["PREVIEW_RECORD_LIMIT"] return render_template( "result.html", - result_id=metadata.result_id, - preview_text=export_result.preview(preview_limit), + result_id=conversion_result.metadata.result_id, + preview_text=conversion_result.export_result.preview( + current_app.config["PREVIEW_RECORD_LIMIT"] + ), output_format=form.output_format, - record_count=len(sorted_records), - parsed_count=len(records), - filtered_count=len(sorted_records), + record_count=conversion_result.filtered_count, + parsed_count=conversion_result.parsed_count, + filtered_count=conversion_result.filtered_count, mode=form.mode, sort_by=form.sort_by, order=form.order, diff --git a/app/services/conversion.py b/app/services/conversion.py new file mode 100644 index 0000000..4ee4edc --- /dev/null +++ b/app/services/conversion.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass +from pathlib import Path + +from app.services.exporter import ExportResult +from app.services.parser import create_parse_session +from app.services.processing import ProcessingOptions, filter_records, sort_records +from app.services.storage import ResultMetadata, persist_result + + +@dataclass(slots=True) +class ConversionResult: + metadata: ResultMetadata + export_result: ExportResult + parsed_count: int + filtered_count: int + + +def convert_uploaded_log( + stream, + options: ProcessingOptions, + output_dir: Path, + output_format: str, + preview_record_limit: int, +) -> ConversionResult: + """Convert an uploaded log into a persisted export with a small in-memory preview. + + Parsing, filtering, and export writing are streamed to keep memory usage low. + Sorting still materializes the filtered records because global ordering by datetime + or severity requires seeing the whole filtered result set first. + """ + parse_session = create_parse_session(stream) + sorted_records = sort_records(filter_records(parse_session.iter_records(), options), options) + metadata, export_result = persist_result( + output_dir=output_dir, + records=sorted_records, + union_keys=parse_session.union_keys(), + mode=options.mode, + output_format=output_format, + preview_record_limit=preview_record_limit, + ) + + return ConversionResult( + metadata=metadata, + export_result=export_result, + parsed_count=parse_session.parsed_count, + filtered_count=len(sorted_records), + ) diff --git a/app/services/exporter.py b/app/services/exporter.py index 917883b..b097b57 100644 --- a/app/services/exporter.py +++ b/app/services/exporter.py @@ -1,69 +1,107 @@ import csv import io from dataclasses import dataclass +from pathlib import Path +from typing import Sequence, TextIO from app.constants import VENDOR_FIELDS @dataclass(slots=True) class ExportResult: - content: str columns: list[str] output_format: str + preview_text: str - def preview(self, record_limit: int) -> str: - """Build a small preview string for the result page.""" - if self.output_format == "text": - marker = f"--- record {record_limit + 1} ---" - if marker in self.content: - return self.content.split(marker, 1)[0].rstrip() - return self.content - - lines = self.content.splitlines() - if len(lines) <= record_limit + 1: - return self.content - return "\n".join(lines[: record_limit + 1]) + def preview(self, _record_limit: int) -> str: + """Return the preview that was collected during export writing.""" + return self.preview_text -def build_export( - records: list[dict[str, str]], +def write_export( + file_path: Path, + records: Sequence[dict[str, str]], union_keys: list[str], mode: str, output_format: str, + preview_record_limit: int, ) -> ExportResult: + """Write the final export directly to disk and keep only a small preview in memory.""" columns = VENDOR_FIELDS if mode == "vendor" else union_keys - if output_format == "text": - return ExportResult( - content=_render_text(records, columns), - columns=columns, - output_format=output_format, - ) + with file_path.open("w", encoding="utf-8", newline="") as export_file: + if output_format == "text": + preview_text = _write_text( + export_file=export_file, + records=records, + columns=columns, + preview_record_limit=preview_record_limit, + ) + else: + preview_text = _write_csv( + export_file=export_file, + records=records, + columns=columns, + preview_record_limit=preview_record_limit, + ) return ExportResult( - content=_render_csv(records, columns), columns=columns, output_format=output_format, + preview_text=preview_text, ) -def _render_text(records: list[dict[str, str]], columns: list[str]) -> str: +def _write_text( + export_file: TextIO, + records: Sequence[dict[str, str]], + columns: list[str], + preview_record_limit: int, +) -> str: max_key_length = max((len(column) for column in columns), default=0) - chunks: list[str] = [] + preview_lines: list[str] = [] + wrote_line = False for index, record in enumerate(records, start=1): - chunks.append(f"--- record {index} ---") + header = f"--- record {index} ---" + wrote_line = _write_line(export_file, header, wrote_line) + if index <= preview_record_limit: + preview_lines.append(header) + for column in columns: - value = record.get(column, "") - chunks.append(f" {column.ljust(max_key_length)} = {value}") + line = f" {column.ljust(max_key_length)} = {record.get(column, '')}" + wrote_line = _write_line(export_file, line, wrote_line) + if index <= preview_record_limit: + preview_lines.append(line) - return "\n".join(chunks) + return "\n".join(preview_lines) -def _render_csv(records: list[dict[str, str]], columns: list[str]) -> str: - buffer = io.StringIO() - writer = csv.DictWriter(buffer, fieldnames=columns, extrasaction="ignore") +def _write_csv( + export_file: TextIO, + records: Sequence[dict[str, str]], + columns: list[str], + preview_record_limit: int, +) -> str: + writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore") writer.writeheader() - for record in records: - writer.writerow({column: record.get(column, "") for column in columns}) - return buffer.getvalue() + + preview_buffer = io.StringIO() + preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore") + preview_writer.writeheader() + + for index, record in enumerate(records, start=1): + row = {column: record.get(column, "") for column in columns} + writer.writerow(row) + if index <= preview_record_limit: + preview_writer.writerow(row) + + return preview_buffer.getvalue().rstrip("\n") + + +def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool: + """Write lines without leaving a trailing newline at the end of the file.""" + if wrote_line: + export_file.write("\n") + export_file.write(line) + return True diff --git a/app/services/parser.py b/app/services/parser.py index c3e4c14..7ecd41d 100644 --- a/app/services/parser.py +++ b/app/services/parser.py @@ -1,4 +1,6 @@ +import codecs from collections import OrderedDict +from dataclasses import dataclass, field from io import BufferedIOBase, TextIOBase import re @@ -10,18 +12,35 @@ class LogParseError(ValueError): """Raised when the uploaded log file cannot be parsed.""" -def _decode_log_content(raw_bytes: bytes | str) -> str: - """Decode uploaded log content using practical text encodings seen in exports.""" - if isinstance(raw_bytes, str): - return raw_bytes +@dataclass(slots=True) +class ParseSession: + """Stateful streamed parser for uploaded log files.""" - for encoding in ("utf-8-sig", "cp1252", "latin-1"): - try: - return raw_bytes.decode(encoding) - except UnicodeDecodeError: - continue + stream: BufferedIOBase | TextIOBase + encoding: str | None + _union_keys: OrderedDict[str, None] = field(default_factory=OrderedDict) + parsed_count: int = 0 + _consumed: bool = False - raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.") + def iter_records(self): + if self._consumed: + raise RuntimeError("ParseSession records can only be consumed once.") + + self._consumed = True + for line_number, line in _iter_logical_records(_iter_physical_lines(self.stream, self.encoding)): + record = _parse_record(line, line_number) + for key in record: + self._union_keys.setdefault(key, None) + self.parsed_count += 1 + yield record + + def union_keys(self) -> list[str]: + return list(self._union_keys.keys()) + + +def create_parse_session(stream: BufferedIOBase | TextIOBase) -> ParseSession: + """Prepare a streamed parser session without materializing the full upload in memory.""" + return ParseSession(stream=stream, encoding=_resolve_stream_encoding(stream)) def _normalize_value(value: str) -> str: @@ -34,6 +53,80 @@ def _normalize_value(value: str) -> str: return value +def _resolve_stream_encoding(stream: BufferedIOBase | TextIOBase) -> str | None: + """Detect the most suitable stream encoding without reading the full file into memory.""" + probe = stream.read(0) + if isinstance(probe, str): + return None + + for encoding in ("utf-8-sig", "cp1252", "latin-1"): + try: + _validate_stream_encoding(stream, encoding) + return encoding + except UnicodeDecodeError: + continue + + raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.") + + +def _validate_stream_encoding(stream: BufferedIOBase | TextIOBase, encoding: str) -> None: + """Scan the stream to verify that the candidate encoding can decode it fully.""" + _rewind_stream(stream) + decoder = codecs.getincrementaldecoder(encoding)() + for chunk in iter(lambda: stream.read(64 * 1024), b""): + decoder.decode(chunk, final=False) + decoder.decode(b"", final=True) + _rewind_stream(stream) + + +def _iter_physical_lines( + stream: BufferedIOBase | TextIOBase, + encoding: str | None, +): + """Yield decoded physical lines from the uploaded stream without full-file buffering.""" + _rewind_stream(stream) + + if encoding is None: + for line_number, raw_line in enumerate(stream, start=1): + yield line_number, raw_line + return + + line_number = 1 + decoder = codecs.getincrementaldecoder(encoding)() + pending = "" + for chunk in iter(lambda: stream.read(64 * 1024), b""): + text = decoder.decode(chunk, final=False) + pending += text + while True: + newline_index = pending.find("\n") + if newline_index == -1: + break + line = pending[: newline_index + 1] + pending = pending[newline_index + 1 :] + yield line_number, line + line_number += 1 + + pending += decoder.decode(b"", final=True) + while True: + newline_index = pending.find("\n") + if newline_index == -1: + break + line = pending[: newline_index + 1] + pending = pending[newline_index + 1 :] + yield line_number, line + line_number += 1 + + if pending: + yield line_number, pending + + +def _rewind_stream(stream: BufferedIOBase | TextIOBase) -> None: + """Move the uploaded stream back to the start.""" + if not hasattr(stream, "seek"): + raise LogParseError("The uploaded file stream is not seekable.") + stream.seek(0) + + def _parse_record(line: str, line_number: int) -> dict[str, str]: """Parse a logical record by locating `key=` boundaries instead of splitting on spaces.""" matches = list(KEY_PATTERN.finditer(line)) @@ -58,20 +151,19 @@ def _parse_record(line: str, line_number: int) -> dict[str, str]: return record -def _iter_logical_records(content: str) -> list[tuple[int, str]]: +def _iter_logical_records(physical_lines): """Rebuild logical records when embedded newlines split a single log entry.""" - records: list[tuple[int, str]] = [] current_record: list[str] = [] current_start_line: int | None = None - for line_number, raw_line in enumerate(content.splitlines(), start=1): + for line_number, raw_line in physical_lines: line = raw_line.strip() if not line: continue if line.startswith(RECORD_PREFIX): if current_record and current_start_line is not None: - records.append((current_start_line, "".join(current_record))) + yield current_start_line, "".join(current_record) current_record = [line] current_start_line = line_number continue @@ -85,24 +177,11 @@ def _iter_logical_records(content: str) -> list[tuple[int, str]]: ) if current_record and current_start_line is not None: - records.append((current_start_line, "".join(current_record))) - - return records + yield current_start_line, "".join(current_record) def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]: - """Parse a text log file where each line contains shell-like key/value tokens.""" - raw_bytes = stream.read() - content = _decode_log_content(raw_bytes) - - records: list[dict[str, str]] = [] - seen_keys: OrderedDict[str, None] = OrderedDict() - - for line_number, line in _iter_logical_records(content): - record = _parse_record(line, line_number) - for key in record: - seen_keys.setdefault(key, None) - - records.append(record) - - return records, list(seen_keys.keys()) + """Compatibility helper that still materializes all parsed records when needed.""" + session = create_parse_session(stream) + records = list(session.iter_records()) + return records, session.union_keys() diff --git a/app/services/processing.py b/app/services/processing.py index f18439f..60628d6 100644 --- a/app/services/processing.py +++ b/app/services/processing.py @@ -1,5 +1,6 @@ from dataclasses import dataclass from datetime import datetime +from typing import Iterable from app.constants import SEVERITY_RANKING @@ -20,11 +21,9 @@ class ProcessingOptions: def filter_records( - records: list[dict[str, str]], options: ProcessingOptions -) -> list[dict[str, str]]: - """Apply user-selected filters to parsed records.""" - filtered: list[dict[str, str]] = [] - + records: Iterable[dict[str, str]], options: ProcessingOptions +) -> Iterable[dict[str, str]]: + """Apply user-selected filters lazily to parsed records.""" for record in records: policy_value = record.get("policy", "") severity_value = record.get("severity_level", "") @@ -38,13 +37,11 @@ def filter_records( if options.severity_ci and options.severity_ci.lower() not in severity_value.lower(): continue - filtered.append(record) - - return filtered + yield record def sort_records( - records: list[dict[str, str]], options: ProcessingOptions + records: Iterable[dict[str, str]], options: ProcessingOptions ) -> list[dict[str, str]]: """Sort records by datetime or severity using the requested order.""" reverse = options.order == "desc" diff --git a/app/services/storage.py b/app/services/storage.py index d0e3d91..b2dde1b 100644 --- a/app/services/storage.py +++ b/app/services/storage.py @@ -3,7 +3,7 @@ import uuid from dataclasses import asdict, dataclass from pathlib import Path -from app.services.exporter import ExportResult +from app.services.exporter import ExportResult, write_export @dataclass(slots=True) @@ -14,16 +14,30 @@ class ResultMetadata: mimetype: str -def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetadata: +def persist_result( + output_dir: Path, + records: list[dict[str, str]], + union_keys: list[str], + mode: str, + output_format: str, + preview_record_limit: int, +) -> tuple[ResultMetadata, ExportResult]: """Persist generated output and sidecar metadata in a temporary directory.""" result_id = uuid.uuid4().hex - extension = "txt" if export_result.output_format == "text" else "csv" + extension = "txt" if output_format == "text" else "csv" mimetype = "text/plain; charset=utf-8" if extension == "txt" else "text/csv; charset=utf-8" file_path = output_dir / f"{result_id}.{extension}" metadata_path = output_dir / f"{result_id}.json" - file_path.write_text(export_result.content, encoding="utf-8") + export_result = write_export( + file_path=file_path, + records=records, + union_keys=union_keys, + mode=mode, + output_format=output_format, + preview_record_limit=preview_record_limit, + ) metadata = ResultMetadata( result_id=result_id, file_path=str(file_path), @@ -31,7 +45,7 @@ def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetad mimetype=mimetype, ) metadata_path.write_text(json.dumps(asdict(metadata)), encoding="utf-8") - return metadata + return metadata, export_result def load_result_metadata(output_dir: Path, result_id: str) -> dict[str, str] | None: diff --git a/tests/test_parser.py b/tests/test_parser.py index 940861b..b1f1a57 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -74,3 +74,19 @@ def test_parse_log_file_rebuilds_record_after_embedded_newlines(): assert records[0]["msg"] == "hellobroken-fragmentworld" assert records[0]["action"] == "Alert" assert records[1]["msg"] == "next" + + +def test_parse_log_file_does_not_require_full_stream_read(): + class NoFullReadBytesIO(io.BytesIO): + def read(self, size=-1): + if size == -1: + raise AssertionError("full stream read should not be used") + return super().read(size) + + stream = NoFullReadBytesIO( + b'v015xxxxdate=2024-02-15 time=09:10:11 policy="Strict Policy" msg="blocked request"\n' + ) + + records, _union_keys = parse_log_file(stream) + + assert records[0]["policy"] == "Strict Policy" diff --git a/tests/test_processing.py b/tests/test_processing.py index b961dfc..a307f18 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -16,7 +16,7 @@ def test_filter_records_supports_case_insensitive_filters(): mode="vendor", ) - filtered = filter_records(records, options) + filtered = list(filter_records(records, options)) assert filtered == [{"policy": "ProdPolicy", "severity_level": "HIGH"}] diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..003eb3b --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,34 @@ +from pathlib import Path + +from app.services.storage import persist_result + + +def test_persist_result_writes_csv_and_collects_preview(tmp_path: Path): + metadata, export_result = persist_result( + output_dir=tmp_path, + records=[ + { + "v015xxxxdate": "2024-05-01", + "time": "10:00:00", + "policy": "Prod Policy", + "severity_level": "high", + }, + { + "v015xxxxdate": "2024-05-02", + "time": "11:00:00", + "policy": "Other Policy", + "severity_level": "low", + }, + ], + union_keys=["v015xxxxdate", "time", "policy", "severity_level"], + mode="full", + output_format="csv", + preview_record_limit=1, + ) + + written = Path(metadata.file_path).read_text(encoding="utf-8") + + assert metadata.download_name == "waf-report.csv" + assert "v015xxxxdate,time,policy,severity_level" in written + assert "2024-05-01,10:00:00,Prod Policy,high" in written + assert export_result.preview(1).count("\n") == 1