Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 846a22c047 | |||
| b8069d6771 | |||
| 93cebeb002 | |||
| f9f792f6a1 |
@@ -91,6 +91,9 @@ Compose settings are stored in `env`. Update that file to change values such as:
|
|||||||
- `SECRET_KEY`
|
- `SECRET_KEY`
|
||||||
- `MAX_UPLOAD_SIZE_MB`
|
- `MAX_UPLOAD_SIZE_MB`
|
||||||
- `OUTPUT_DIRECTORY`
|
- `OUTPUT_DIRECTORY`
|
||||||
|
- `OUTPUT_RETENTION_HOURS`
|
||||||
|
- `CLEANUP_ON_STARTUP`
|
||||||
|
- `CLEANUP_AFTER_DOWNLOAD`
|
||||||
|
|
||||||
### Run the test suite in a container
|
### Run the test suite in a container
|
||||||
|
|
||||||
@@ -126,8 +129,14 @@ curl -X POST http://127.0.0.1:5000/convert \
|
|||||||
## Notes
|
## Notes
|
||||||
|
|
||||||
- Temporary output files are written to `instance/outputs`
|
- Temporary output files are written to `instance/outputs`
|
||||||
|
- Generated files are cleaned up according to the configured output retention policy
|
||||||
- The application does not require a database
|
- The application does not require a database
|
||||||
- Gunicorn is used as the production WSGI server
|
- Gunicorn is used as the production WSGI server
|
||||||
|
- Parsing and export writing are streamed to reduce memory usage on large uploads
|
||||||
|
- Sorting still materializes the filtered record set because global ordering by datetime or severity requires the full filtered input
|
||||||
- Default upload limit is 100 MiB
|
- Default upload limit is 100 MiB
|
||||||
- Set `MAX_UPLOAD_SIZE_MB` to configure the upload limit in megabytes
|
- Set `MAX_UPLOAD_SIZE_MB` to configure the upload limit in megabytes
|
||||||
- `MAX_CONTENT_LENGTH` is also supported as a lower-level byte-based override
|
- `MAX_CONTENT_LENGTH` is also supported as a lower-level byte-based override
|
||||||
|
- `OUTPUT_RETENTION_HOURS` controls how long generated output files are kept
|
||||||
|
- `CLEANUP_ON_STARTUP=true` removes expired generated files when the app starts
|
||||||
|
- `CLEANUP_AFTER_DOWNLOAD=true` deletes a result only after the response finishes sending
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from werkzeug.exceptions import RequestEntityTooLarge
|
|||||||
|
|
||||||
from app.config import Config
|
from app.config import Config
|
||||||
from app.routes import main_blueprint
|
from app.routes import main_blueprint
|
||||||
|
from app.services.storage import cleanup_expired_outputs
|
||||||
|
|
||||||
|
|
||||||
def _format_size_limit(size_limit_bytes: int) -> str:
|
def _format_size_limit(size_limit_bytes: int) -> str:
|
||||||
@@ -27,6 +28,12 @@ def create_app(config_class: type[Config] = Config) -> Flask:
|
|||||||
app.config["OUTPUT_DIRECTORY"] = output_dir
|
app.config["OUTPUT_DIRECTORY"] = output_dir
|
||||||
output_dir.mkdir(parents=True, exist_ok=True)
|
output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
if app.config.get("CLEANUP_ON_STARTUP", False):
|
||||||
|
cleanup_expired_outputs(
|
||||||
|
output_dir=output_dir,
|
||||||
|
retention_hours=app.config.get("OUTPUT_RETENTION_HOURS", 24),
|
||||||
|
)
|
||||||
|
|
||||||
app.register_blueprint(main_blueprint)
|
app.register_blueprint(main_blueprint)
|
||||||
|
|
||||||
@app.errorhandler(RequestEntityTooLarge)
|
@app.errorhandler(RequestEntityTooLarge)
|
||||||
|
|||||||
@@ -2,6 +2,14 @@ import os
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def _get_bool_setting(name: str, default: bool) -> bool:
|
||||||
|
"""Parse conventional boolean environment values."""
|
||||||
|
value = os.environ.get(name)
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
||||||
|
|
||||||
|
|
||||||
def _get_max_content_length() -> int:
|
def _get_max_content_length() -> int:
|
||||||
"""Resolve the upload size limit from environment settings."""
|
"""Resolve the upload size limit from environment settings."""
|
||||||
upload_limit_mb = os.environ.get("MAX_UPLOAD_SIZE_MB")
|
upload_limit_mb = os.environ.get("MAX_UPLOAD_SIZE_MB")
|
||||||
@@ -25,3 +33,6 @@ class Config:
|
|||||||
OUTPUT_DIRECTORY = Path(
|
OUTPUT_DIRECTORY = Path(
|
||||||
os.environ.get("OUTPUT_DIRECTORY", Path("instance") / "outputs")
|
os.environ.get("OUTPUT_DIRECTORY", Path("instance") / "outputs")
|
||||||
)
|
)
|
||||||
|
OUTPUT_RETENTION_HOURS = int(os.environ.get("OUTPUT_RETENTION_HOURS", 24))
|
||||||
|
CLEANUP_ON_STARTUP = _get_bool_setting("CLEANUP_ON_STARTUP", True)
|
||||||
|
CLEANUP_AFTER_DOWNLOAD = _get_bool_setting("CLEANUP_AFTER_DOWNLOAD", False)
|
||||||
|
|||||||
+25
-22
@@ -12,17 +12,13 @@ from flask import (
|
|||||||
url_for,
|
url_for,
|
||||||
)
|
)
|
||||||
from werkzeug.datastructures import FileStorage
|
from werkzeug.datastructures import FileStorage
|
||||||
|
from werkzeug.wsgi import ClosingIterator
|
||||||
|
|
||||||
from app.constants import MODES, OUTPUT_FORMATS, SORTABLE_FIELDS, SORT_ORDERS
|
from app.constants import MODES, OUTPUT_FORMATS, SORTABLE_FIELDS, SORT_ORDERS
|
||||||
from app.services.exporter import build_export
|
from app.services.conversion import convert_uploaded_log
|
||||||
from app.services.parser import LogParseError, parse_log_file
|
from app.services.parser import LogParseError
|
||||||
from app.services.processing import (
|
from app.services.processing import ProcessingError, ProcessingOptions
|
||||||
ProcessingError,
|
from app.services.storage import delete_result_files, load_result_metadata
|
||||||
ProcessingOptions,
|
|
||||||
filter_records,
|
|
||||||
sort_records,
|
|
||||||
)
|
|
||||||
from app.services.storage import load_result_metadata, persist_result
|
|
||||||
|
|
||||||
main_blueprint = Blueprint("main", __name__)
|
main_blueprint = Blueprint("main", __name__)
|
||||||
|
|
||||||
@@ -95,7 +91,6 @@ def convert():
|
|||||||
assert uploaded_file is not None
|
assert uploaded_file is not None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
records, union_keys = parse_log_file(uploaded_file.stream)
|
|
||||||
options = ProcessingOptions(
|
options = ProcessingOptions(
|
||||||
policy_cs=form.policy_cs,
|
policy_cs=form.policy_cs,
|
||||||
policy_ci=form.policy_ci,
|
policy_ci=form.policy_ci,
|
||||||
@@ -105,12 +100,12 @@ def convert():
|
|||||||
order=form.order,
|
order=form.order,
|
||||||
mode=form.mode,
|
mode=form.mode,
|
||||||
)
|
)
|
||||||
filtered_records = filter_records(records, options)
|
conversion_result = convert_uploaded_log(
|
||||||
sorted_records = sort_records(filtered_records, options)
|
stream=uploaded_file.stream,
|
||||||
export_result = build_export(sorted_records, union_keys, form.mode, form.output_format)
|
options=options,
|
||||||
metadata = persist_result(
|
|
||||||
output_dir=current_app.config["OUTPUT_DIRECTORY"],
|
output_dir=current_app.config["OUTPUT_DIRECTORY"],
|
||||||
export_result=export_result,
|
output_format=form.output_format,
|
||||||
|
preview_record_limit=current_app.config["PREVIEW_RECORD_LIMIT"],
|
||||||
)
|
)
|
||||||
except (LogParseError, ProcessingError) as exc:
|
except (LogParseError, ProcessingError) as exc:
|
||||||
flash(str(exc), "danger")
|
flash(str(exc), "danger")
|
||||||
@@ -122,15 +117,16 @@ def convert():
|
|||||||
)
|
)
|
||||||
return render_template("index.html", form=form), 400
|
return render_template("index.html", form=form), 400
|
||||||
|
|
||||||
preview_limit = current_app.config["PREVIEW_RECORD_LIMIT"]
|
|
||||||
return render_template(
|
return render_template(
|
||||||
"result.html",
|
"result.html",
|
||||||
result_id=metadata.result_id,
|
result_id=conversion_result.metadata.result_id,
|
||||||
preview_text=export_result.preview(preview_limit),
|
preview_text=conversion_result.export_result.preview(
|
||||||
|
current_app.config["PREVIEW_RECORD_LIMIT"]
|
||||||
|
),
|
||||||
output_format=form.output_format,
|
output_format=form.output_format,
|
||||||
record_count=len(sorted_records),
|
record_count=conversion_result.filtered_count,
|
||||||
parsed_count=len(records),
|
parsed_count=conversion_result.parsed_count,
|
||||||
filtered_count=len(sorted_records),
|
filtered_count=conversion_result.filtered_count,
|
||||||
mode=form.mode,
|
mode=form.mode,
|
||||||
sort_by=form.sort_by,
|
sort_by=form.sort_by,
|
||||||
order=form.order,
|
order=form.order,
|
||||||
@@ -144,10 +140,17 @@ def download(result_id: str):
|
|||||||
flash("Requested output file could not be found.", "danger")
|
flash("Requested output file could not be found.", "danger")
|
||||||
return redirect(url_for("main.index"))
|
return redirect(url_for("main.index"))
|
||||||
|
|
||||||
return send_file(
|
response = send_file(
|
||||||
Path(metadata["file_path"]),
|
Path(metadata["file_path"]),
|
||||||
as_attachment=True,
|
as_attachment=True,
|
||||||
download_name=metadata["download_name"],
|
download_name=metadata["download_name"],
|
||||||
mimetype=metadata["mimetype"],
|
mimetype=metadata["mimetype"],
|
||||||
max_age=0,
|
max_age=0,
|
||||||
)
|
)
|
||||||
|
if current_app.config.get("CLEANUP_AFTER_DOWNLOAD", False):
|
||||||
|
output_dir = current_app.config["OUTPUT_DIRECTORY"]
|
||||||
|
response.response = ClosingIterator(
|
||||||
|
response.response,
|
||||||
|
[lambda: delete_result_files(output_dir=output_dir, result_id=result_id)],
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|||||||
@@ -0,0 +1,47 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from app.services.exporter import ExportResult
|
||||||
|
from app.services.parser import create_parse_session
|
||||||
|
from app.services.processing import ProcessingOptions, filter_records, sort_records
|
||||||
|
from app.services.storage import ResultMetadata, persist_result
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class ConversionResult:
|
||||||
|
metadata: ResultMetadata
|
||||||
|
export_result: ExportResult
|
||||||
|
parsed_count: int
|
||||||
|
filtered_count: int
|
||||||
|
|
||||||
|
|
||||||
|
def convert_uploaded_log(
|
||||||
|
stream,
|
||||||
|
options: ProcessingOptions,
|
||||||
|
output_dir: Path,
|
||||||
|
output_format: str,
|
||||||
|
preview_record_limit: int,
|
||||||
|
) -> ConversionResult:
|
||||||
|
"""Convert an uploaded log into a persisted export with a small in-memory preview.
|
||||||
|
|
||||||
|
Parsing, filtering, and export writing are streamed to keep memory usage low.
|
||||||
|
Sorting still materializes the filtered records because global ordering by datetime
|
||||||
|
or severity requires seeing the whole filtered result set first.
|
||||||
|
"""
|
||||||
|
parse_session = create_parse_session(stream)
|
||||||
|
sorted_records = sort_records(filter_records(parse_session.iter_records(), options), options)
|
||||||
|
metadata, export_result = persist_result(
|
||||||
|
output_dir=output_dir,
|
||||||
|
records=sorted_records,
|
||||||
|
union_keys=parse_session.union_keys(),
|
||||||
|
mode=options.mode,
|
||||||
|
output_format=output_format,
|
||||||
|
preview_record_limit=preview_record_limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
return ConversionResult(
|
||||||
|
metadata=metadata,
|
||||||
|
export_result=export_result,
|
||||||
|
parsed_count=parse_session.parsed_count,
|
||||||
|
filtered_count=len(sorted_records),
|
||||||
|
)
|
||||||
+72
-34
@@ -1,69 +1,107 @@
|
|||||||
import csv
|
import csv
|
||||||
import io
|
import io
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Sequence, TextIO
|
||||||
|
|
||||||
from app.constants import VENDOR_FIELDS
|
from app.constants import VENDOR_FIELDS
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class ExportResult:
|
class ExportResult:
|
||||||
content: str
|
|
||||||
columns: list[str]
|
columns: list[str]
|
||||||
output_format: str
|
output_format: str
|
||||||
|
preview_text: str
|
||||||
|
|
||||||
def preview(self, record_limit: int) -> str:
|
def preview(self, _record_limit: int) -> str:
|
||||||
"""Build a small preview string for the result page."""
|
"""Return the preview that was collected during export writing."""
|
||||||
if self.output_format == "text":
|
return self.preview_text
|
||||||
marker = f"--- record {record_limit + 1} ---"
|
|
||||||
if marker in self.content:
|
|
||||||
return self.content.split(marker, 1)[0].rstrip()
|
|
||||||
return self.content
|
|
||||||
|
|
||||||
lines = self.content.splitlines()
|
|
||||||
if len(lines) <= record_limit + 1:
|
|
||||||
return self.content
|
|
||||||
return "\n".join(lines[: record_limit + 1])
|
|
||||||
|
|
||||||
|
|
||||||
def build_export(
|
def write_export(
|
||||||
records: list[dict[str, str]],
|
file_path: Path,
|
||||||
|
records: Sequence[dict[str, str]],
|
||||||
union_keys: list[str],
|
union_keys: list[str],
|
||||||
mode: str,
|
mode: str,
|
||||||
output_format: str,
|
output_format: str,
|
||||||
|
preview_record_limit: int,
|
||||||
) -> ExportResult:
|
) -> ExportResult:
|
||||||
|
"""Write the final export directly to disk and keep only a small preview in memory."""
|
||||||
columns = VENDOR_FIELDS if mode == "vendor" else union_keys
|
columns = VENDOR_FIELDS if mode == "vendor" else union_keys
|
||||||
|
|
||||||
if output_format == "text":
|
with file_path.open("w", encoding="utf-8", newline="") as export_file:
|
||||||
return ExportResult(
|
if output_format == "text":
|
||||||
content=_render_text(records, columns),
|
preview_text = _write_text(
|
||||||
columns=columns,
|
export_file=export_file,
|
||||||
output_format=output_format,
|
records=records,
|
||||||
)
|
columns=columns,
|
||||||
|
preview_record_limit=preview_record_limit,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
preview_text = _write_csv(
|
||||||
|
export_file=export_file,
|
||||||
|
records=records,
|
||||||
|
columns=columns,
|
||||||
|
preview_record_limit=preview_record_limit,
|
||||||
|
)
|
||||||
|
|
||||||
return ExportResult(
|
return ExportResult(
|
||||||
content=_render_csv(records, columns),
|
|
||||||
columns=columns,
|
columns=columns,
|
||||||
output_format=output_format,
|
output_format=output_format,
|
||||||
|
preview_text=preview_text,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _render_text(records: list[dict[str, str]], columns: list[str]) -> str:
|
def _write_text(
|
||||||
|
export_file: TextIO,
|
||||||
|
records: Sequence[dict[str, str]],
|
||||||
|
columns: list[str],
|
||||||
|
preview_record_limit: int,
|
||||||
|
) -> str:
|
||||||
max_key_length = max((len(column) for column in columns), default=0)
|
max_key_length = max((len(column) for column in columns), default=0)
|
||||||
chunks: list[str] = []
|
preview_lines: list[str] = []
|
||||||
|
wrote_line = False
|
||||||
|
|
||||||
for index, record in enumerate(records, start=1):
|
for index, record in enumerate(records, start=1):
|
||||||
chunks.append(f"--- record {index} ---")
|
header = f"--- record {index} ---"
|
||||||
|
wrote_line = _write_line(export_file, header, wrote_line)
|
||||||
|
if index <= preview_record_limit:
|
||||||
|
preview_lines.append(header)
|
||||||
|
|
||||||
for column in columns:
|
for column in columns:
|
||||||
value = record.get(column, "")
|
line = f" {column.ljust(max_key_length)} = {record.get(column, '')}"
|
||||||
chunks.append(f" {column.ljust(max_key_length)} = {value}")
|
wrote_line = _write_line(export_file, line, wrote_line)
|
||||||
|
if index <= preview_record_limit:
|
||||||
|
preview_lines.append(line)
|
||||||
|
|
||||||
return "\n".join(chunks)
|
return "\n".join(preview_lines)
|
||||||
|
|
||||||
|
|
||||||
def _render_csv(records: list[dict[str, str]], columns: list[str]) -> str:
|
def _write_csv(
|
||||||
buffer = io.StringIO()
|
export_file: TextIO,
|
||||||
writer = csv.DictWriter(buffer, fieldnames=columns, extrasaction="ignore")
|
records: Sequence[dict[str, str]],
|
||||||
|
columns: list[str],
|
||||||
|
preview_record_limit: int,
|
||||||
|
) -> str:
|
||||||
|
writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore")
|
||||||
writer.writeheader()
|
writer.writeheader()
|
||||||
for record in records:
|
|
||||||
writer.writerow({column: record.get(column, "") for column in columns})
|
preview_buffer = io.StringIO()
|
||||||
return buffer.getvalue()
|
preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore")
|
||||||
|
preview_writer.writeheader()
|
||||||
|
|
||||||
|
for index, record in enumerate(records, start=1):
|
||||||
|
row = {column: record.get(column, "") for column in columns}
|
||||||
|
writer.writerow(row)
|
||||||
|
if index <= preview_record_limit:
|
||||||
|
preview_writer.writerow(row)
|
||||||
|
|
||||||
|
return preview_buffer.getvalue().rstrip("\n")
|
||||||
|
|
||||||
|
|
||||||
|
def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool:
|
||||||
|
"""Write lines without leaving a trailing newline at the end of the file."""
|
||||||
|
if wrote_line:
|
||||||
|
export_file.write("\n")
|
||||||
|
export_file.write(line)
|
||||||
|
return True
|
||||||
|
|||||||
+111
-32
@@ -1,4 +1,6 @@
|
|||||||
|
import codecs
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from dataclasses import dataclass, field
|
||||||
from io import BufferedIOBase, TextIOBase
|
from io import BufferedIOBase, TextIOBase
|
||||||
import re
|
import re
|
||||||
|
|
||||||
@@ -10,18 +12,35 @@ class LogParseError(ValueError):
|
|||||||
"""Raised when the uploaded log file cannot be parsed."""
|
"""Raised when the uploaded log file cannot be parsed."""
|
||||||
|
|
||||||
|
|
||||||
def _decode_log_content(raw_bytes: bytes | str) -> str:
|
@dataclass(slots=True)
|
||||||
"""Decode uploaded log content using practical text encodings seen in exports."""
|
class ParseSession:
|
||||||
if isinstance(raw_bytes, str):
|
"""Stateful streamed parser for uploaded log files."""
|
||||||
return raw_bytes
|
|
||||||
|
|
||||||
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
|
stream: BufferedIOBase | TextIOBase
|
||||||
try:
|
encoding: str | None
|
||||||
return raw_bytes.decode(encoding)
|
_union_keys: OrderedDict[str, None] = field(default_factory=OrderedDict)
|
||||||
except UnicodeDecodeError:
|
parsed_count: int = 0
|
||||||
continue
|
_consumed: bool = False
|
||||||
|
|
||||||
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
|
def iter_records(self):
|
||||||
|
if self._consumed:
|
||||||
|
raise RuntimeError("ParseSession records can only be consumed once.")
|
||||||
|
|
||||||
|
self._consumed = True
|
||||||
|
for line_number, line in _iter_logical_records(_iter_physical_lines(self.stream, self.encoding)):
|
||||||
|
record = _parse_record(line, line_number)
|
||||||
|
for key in record:
|
||||||
|
self._union_keys.setdefault(key, None)
|
||||||
|
self.parsed_count += 1
|
||||||
|
yield record
|
||||||
|
|
||||||
|
def union_keys(self) -> list[str]:
|
||||||
|
return list(self._union_keys.keys())
|
||||||
|
|
||||||
|
|
||||||
|
def create_parse_session(stream: BufferedIOBase | TextIOBase) -> ParseSession:
|
||||||
|
"""Prepare a streamed parser session without materializing the full upload in memory."""
|
||||||
|
return ParseSession(stream=stream, encoding=_resolve_stream_encoding(stream))
|
||||||
|
|
||||||
|
|
||||||
def _normalize_value(value: str) -> str:
|
def _normalize_value(value: str) -> str:
|
||||||
@@ -34,6 +53,80 @@ def _normalize_value(value: str) -> str:
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_stream_encoding(stream: BufferedIOBase | TextIOBase) -> str | None:
|
||||||
|
"""Detect the most suitable stream encoding without reading the full file into memory."""
|
||||||
|
probe = stream.read(0)
|
||||||
|
if isinstance(probe, str):
|
||||||
|
return None
|
||||||
|
|
||||||
|
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
|
||||||
|
try:
|
||||||
|
_validate_stream_encoding(stream, encoding)
|
||||||
|
return encoding
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_stream_encoding(stream: BufferedIOBase | TextIOBase, encoding: str) -> None:
|
||||||
|
"""Scan the stream to verify that the candidate encoding can decode it fully."""
|
||||||
|
_rewind_stream(stream)
|
||||||
|
decoder = codecs.getincrementaldecoder(encoding)()
|
||||||
|
for chunk in iter(lambda: stream.read(64 * 1024), b""):
|
||||||
|
decoder.decode(chunk, final=False)
|
||||||
|
decoder.decode(b"", final=True)
|
||||||
|
_rewind_stream(stream)
|
||||||
|
|
||||||
|
|
||||||
|
def _iter_physical_lines(
|
||||||
|
stream: BufferedIOBase | TextIOBase,
|
||||||
|
encoding: str | None,
|
||||||
|
):
|
||||||
|
"""Yield decoded physical lines from the uploaded stream without full-file buffering."""
|
||||||
|
_rewind_stream(stream)
|
||||||
|
|
||||||
|
if encoding is None:
|
||||||
|
for line_number, raw_line in enumerate(stream, start=1):
|
||||||
|
yield line_number, raw_line
|
||||||
|
return
|
||||||
|
|
||||||
|
line_number = 1
|
||||||
|
decoder = codecs.getincrementaldecoder(encoding)()
|
||||||
|
pending = ""
|
||||||
|
for chunk in iter(lambda: stream.read(64 * 1024), b""):
|
||||||
|
text = decoder.decode(chunk, final=False)
|
||||||
|
pending += text
|
||||||
|
while True:
|
||||||
|
newline_index = pending.find("\n")
|
||||||
|
if newline_index == -1:
|
||||||
|
break
|
||||||
|
line = pending[: newline_index + 1]
|
||||||
|
pending = pending[newline_index + 1 :]
|
||||||
|
yield line_number, line
|
||||||
|
line_number += 1
|
||||||
|
|
||||||
|
pending += decoder.decode(b"", final=True)
|
||||||
|
while True:
|
||||||
|
newline_index = pending.find("\n")
|
||||||
|
if newline_index == -1:
|
||||||
|
break
|
||||||
|
line = pending[: newline_index + 1]
|
||||||
|
pending = pending[newline_index + 1 :]
|
||||||
|
yield line_number, line
|
||||||
|
line_number += 1
|
||||||
|
|
||||||
|
if pending:
|
||||||
|
yield line_number, pending
|
||||||
|
|
||||||
|
|
||||||
|
def _rewind_stream(stream: BufferedIOBase | TextIOBase) -> None:
|
||||||
|
"""Move the uploaded stream back to the start."""
|
||||||
|
if not hasattr(stream, "seek"):
|
||||||
|
raise LogParseError("The uploaded file stream is not seekable.")
|
||||||
|
stream.seek(0)
|
||||||
|
|
||||||
|
|
||||||
def _parse_record(line: str, line_number: int) -> dict[str, str]:
|
def _parse_record(line: str, line_number: int) -> dict[str, str]:
|
||||||
"""Parse a logical record by locating `key=` boundaries instead of splitting on spaces."""
|
"""Parse a logical record by locating `key=` boundaries instead of splitting on spaces."""
|
||||||
matches = list(KEY_PATTERN.finditer(line))
|
matches = list(KEY_PATTERN.finditer(line))
|
||||||
@@ -58,20 +151,19 @@ def _parse_record(line: str, line_number: int) -> dict[str, str]:
|
|||||||
return record
|
return record
|
||||||
|
|
||||||
|
|
||||||
def _iter_logical_records(content: str) -> list[tuple[int, str]]:
|
def _iter_logical_records(physical_lines):
|
||||||
"""Rebuild logical records when embedded newlines split a single log entry."""
|
"""Rebuild logical records when embedded newlines split a single log entry."""
|
||||||
records: list[tuple[int, str]] = []
|
|
||||||
current_record: list[str] = []
|
current_record: list[str] = []
|
||||||
current_start_line: int | None = None
|
current_start_line: int | None = None
|
||||||
|
|
||||||
for line_number, raw_line in enumerate(content.splitlines(), start=1):
|
for line_number, raw_line in physical_lines:
|
||||||
line = raw_line.strip()
|
line = raw_line.strip()
|
||||||
if not line:
|
if not line:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if line.startswith(RECORD_PREFIX):
|
if line.startswith(RECORD_PREFIX):
|
||||||
if current_record and current_start_line is not None:
|
if current_record and current_start_line is not None:
|
||||||
records.append((current_start_line, "".join(current_record)))
|
yield current_start_line, "".join(current_record)
|
||||||
current_record = [line]
|
current_record = [line]
|
||||||
current_start_line = line_number
|
current_start_line = line_number
|
||||||
continue
|
continue
|
||||||
@@ -85,24 +177,11 @@ def _iter_logical_records(content: str) -> list[tuple[int, str]]:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if current_record and current_start_line is not None:
|
if current_record and current_start_line is not None:
|
||||||
records.append((current_start_line, "".join(current_record)))
|
yield current_start_line, "".join(current_record)
|
||||||
|
|
||||||
return records
|
|
||||||
|
|
||||||
|
|
||||||
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
|
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
|
||||||
"""Parse a text log file where each line contains shell-like key/value tokens."""
|
"""Compatibility helper that still materializes all parsed records when needed."""
|
||||||
raw_bytes = stream.read()
|
session = create_parse_session(stream)
|
||||||
content = _decode_log_content(raw_bytes)
|
records = list(session.iter_records())
|
||||||
|
return records, session.union_keys()
|
||||||
records: list[dict[str, str]] = []
|
|
||||||
seen_keys: OrderedDict[str, None] = OrderedDict()
|
|
||||||
|
|
||||||
for line_number, line in _iter_logical_records(content):
|
|
||||||
record = _parse_record(line, line_number)
|
|
||||||
for key in record:
|
|
||||||
seen_keys.setdefault(key, None)
|
|
||||||
|
|
||||||
records.append(record)
|
|
||||||
|
|
||||||
return records, list(seen_keys.keys())
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
from app.constants import SEVERITY_RANKING
|
from app.constants import SEVERITY_RANKING
|
||||||
|
|
||||||
@@ -20,11 +21,9 @@ class ProcessingOptions:
|
|||||||
|
|
||||||
|
|
||||||
def filter_records(
|
def filter_records(
|
||||||
records: list[dict[str, str]], options: ProcessingOptions
|
records: Iterable[dict[str, str]], options: ProcessingOptions
|
||||||
) -> list[dict[str, str]]:
|
) -> Iterable[dict[str, str]]:
|
||||||
"""Apply user-selected filters to parsed records."""
|
"""Apply user-selected filters lazily to parsed records."""
|
||||||
filtered: list[dict[str, str]] = []
|
|
||||||
|
|
||||||
for record in records:
|
for record in records:
|
||||||
policy_value = record.get("policy", "")
|
policy_value = record.get("policy", "")
|
||||||
severity_value = record.get("severity_level", "")
|
severity_value = record.get("severity_level", "")
|
||||||
@@ -38,13 +37,11 @@ def filter_records(
|
|||||||
if options.severity_ci and options.severity_ci.lower() not in severity_value.lower():
|
if options.severity_ci and options.severity_ci.lower() not in severity_value.lower():
|
||||||
continue
|
continue
|
||||||
|
|
||||||
filtered.append(record)
|
yield record
|
||||||
|
|
||||||
return filtered
|
|
||||||
|
|
||||||
|
|
||||||
def sort_records(
|
def sort_records(
|
||||||
records: list[dict[str, str]], options: ProcessingOptions
|
records: Iterable[dict[str, str]], options: ProcessingOptions
|
||||||
) -> list[dict[str, str]]:
|
) -> list[dict[str, str]]:
|
||||||
"""Sort records by datetime or severity using the requested order."""
|
"""Sort records by datetime or severity using the requested order."""
|
||||||
reverse = options.order == "desc"
|
reverse = options.order == "desc"
|
||||||
|
|||||||
+72
-6
@@ -1,9 +1,10 @@
|
|||||||
import json
|
import json
|
||||||
import uuid
|
import uuid
|
||||||
from dataclasses import asdict, dataclass
|
from dataclasses import asdict, dataclass
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from app.services.exporter import ExportResult
|
from app.services.exporter import ExportResult, write_export
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
@@ -14,16 +15,36 @@ class ResultMetadata:
|
|||||||
mimetype: str
|
mimetype: str
|
||||||
|
|
||||||
|
|
||||||
def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetadata:
|
def _result_paths(output_dir: Path, result_id: str) -> tuple[Path, Path]:
|
||||||
|
"""Build the sidecar metadata and output file search pattern for a result id."""
|
||||||
|
metadata_path = output_dir / f"{result_id}.json"
|
||||||
|
return metadata_path, output_dir / f"{result_id}"
|
||||||
|
|
||||||
|
|
||||||
|
def persist_result(
|
||||||
|
output_dir: Path,
|
||||||
|
records: list[dict[str, str]],
|
||||||
|
union_keys: list[str],
|
||||||
|
mode: str,
|
||||||
|
output_format: str,
|
||||||
|
preview_record_limit: int,
|
||||||
|
) -> tuple[ResultMetadata, ExportResult]:
|
||||||
"""Persist generated output and sidecar metadata in a temporary directory."""
|
"""Persist generated output and sidecar metadata in a temporary directory."""
|
||||||
result_id = uuid.uuid4().hex
|
result_id = uuid.uuid4().hex
|
||||||
extension = "txt" if export_result.output_format == "text" else "csv"
|
extension = "txt" if output_format == "text" else "csv"
|
||||||
mimetype = "text/plain; charset=utf-8" if extension == "txt" else "text/csv; charset=utf-8"
|
mimetype = "text/plain; charset=utf-8" if extension == "txt" else "text/csv; charset=utf-8"
|
||||||
|
|
||||||
file_path = output_dir / f"{result_id}.{extension}"
|
file_path = output_dir / f"{result_id}.{extension}"
|
||||||
metadata_path = output_dir / f"{result_id}.json"
|
metadata_path = output_dir / f"{result_id}.json"
|
||||||
|
|
||||||
file_path.write_text(export_result.content, encoding="utf-8")
|
export_result = write_export(
|
||||||
|
file_path=file_path,
|
||||||
|
records=records,
|
||||||
|
union_keys=union_keys,
|
||||||
|
mode=mode,
|
||||||
|
output_format=output_format,
|
||||||
|
preview_record_limit=preview_record_limit,
|
||||||
|
)
|
||||||
metadata = ResultMetadata(
|
metadata = ResultMetadata(
|
||||||
result_id=result_id,
|
result_id=result_id,
|
||||||
file_path=str(file_path),
|
file_path=str(file_path),
|
||||||
@@ -31,13 +52,58 @@ def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetad
|
|||||||
mimetype=mimetype,
|
mimetype=mimetype,
|
||||||
)
|
)
|
||||||
metadata_path.write_text(json.dumps(asdict(metadata)), encoding="utf-8")
|
metadata_path.write_text(json.dumps(asdict(metadata)), encoding="utf-8")
|
||||||
return metadata
|
return metadata, export_result
|
||||||
|
|
||||||
|
|
||||||
def load_result_metadata(output_dir: Path, result_id: str) -> dict[str, str] | None:
|
def load_result_metadata(output_dir: Path, result_id: str) -> dict[str, str] | None:
|
||||||
"""Load sidecar metadata for a generated file."""
|
"""Load sidecar metadata for a generated file."""
|
||||||
metadata_path = output_dir / f"{result_id}.json"
|
metadata_path, _base_path = _result_paths(output_dir, result_id)
|
||||||
if not metadata_path.exists():
|
if not metadata_path.exists():
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return json.loads(metadata_path.read_text(encoding="utf-8"))
|
return json.loads(metadata_path.read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def delete_result_files(output_dir: Path, result_id: str) -> None:
|
||||||
|
"""Delete a generated output file and its metadata sidecar if they still exist."""
|
||||||
|
metadata_path, base_path = _result_paths(output_dir, result_id)
|
||||||
|
for output_file in output_dir.glob(f"{base_path.name}.*"):
|
||||||
|
if output_file.name == metadata_path.name:
|
||||||
|
continue
|
||||||
|
output_file.unlink(missing_ok=True)
|
||||||
|
metadata_path.unlink(missing_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_expired_outputs(output_dir: Path, retention_hours: int) -> int:
|
||||||
|
"""Delete generated output sets older than the configured retention window."""
|
||||||
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=retention_hours)
|
||||||
|
deleted_results = 0
|
||||||
|
|
||||||
|
for metadata_path in output_dir.glob("*.json"):
|
||||||
|
try:
|
||||||
|
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
payload = {}
|
||||||
|
|
||||||
|
result_id = payload.get("result_id") or metadata_path.stem
|
||||||
|
file_path = Path(payload["file_path"]) if "file_path" in payload else None
|
||||||
|
newest_mtime = _newest_mtime(metadata_path, file_path)
|
||||||
|
if newest_mtime is None or newest_mtime >= cutoff:
|
||||||
|
continue
|
||||||
|
|
||||||
|
delete_result_files(output_dir=output_dir, result_id=result_id)
|
||||||
|
deleted_results += 1
|
||||||
|
|
||||||
|
return deleted_results
|
||||||
|
|
||||||
|
|
||||||
|
def _newest_mtime(metadata_path: Path, file_path: Path | None) -> datetime | None:
|
||||||
|
"""Return the newest modification time across the metadata and output file."""
|
||||||
|
mtimes: list[datetime] = []
|
||||||
|
if metadata_path.exists():
|
||||||
|
mtimes.append(datetime.fromtimestamp(metadata_path.stat().st_mtime, tz=timezone.utc))
|
||||||
|
if file_path is not None and file_path.exists():
|
||||||
|
mtimes.append(datetime.fromtimestamp(file_path.stat().st_mtime, tz=timezone.utc))
|
||||||
|
if not mtimes:
|
||||||
|
return None
|
||||||
|
return max(mtimes)
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
SECRET_KEY=change-me
|
SECRET_KEY=change-me
|
||||||
MAX_UPLOAD_SIZE_MB=120
|
MAX_UPLOAD_SIZE_MB=120
|
||||||
OUTPUT_DIRECTORY=/app/instance/outputs
|
OUTPUT_DIRECTORY=/app/instance/outputs
|
||||||
|
OUTPUT_RETENTION_HOURS=24
|
||||||
|
CLEANUP_ON_STARTUP=true
|
||||||
|
CLEANUP_AFTER_DOWNLOAD=false
|
||||||
|
|||||||
@@ -12,6 +12,9 @@ class TestConfig:
|
|||||||
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
|
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
|
||||||
PREVIEW_RECORD_LIMIT = 5
|
PREVIEW_RECORD_LIMIT = 5
|
||||||
OUTPUT_DIRECTORY = "test-outputs"
|
OUTPUT_DIRECTORY = "test-outputs"
|
||||||
|
OUTPUT_RETENTION_HOURS = 24
|
||||||
|
CLEANUP_ON_STARTUP = False
|
||||||
|
CLEANUP_AFTER_DOWNLOAD = False
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
import io
|
import io
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from app import create_app
|
from app import create_app
|
||||||
|
|
||||||
@@ -132,6 +134,95 @@ def test_download_route_returns_generated_file(client):
|
|||||||
download_response.close()
|
download_response.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_route_can_cleanup_files_after_download(tmp_path):
|
||||||
|
class CleanupAfterDownloadConfig:
|
||||||
|
TESTING = True
|
||||||
|
SECRET_KEY = "test-secret"
|
||||||
|
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
|
||||||
|
PREVIEW_RECORD_LIMIT = 5
|
||||||
|
OUTPUT_DIRECTORY = tmp_path / "download-cleanup-outputs"
|
||||||
|
OUTPUT_RETENTION_HOURS = 24
|
||||||
|
CLEANUP_ON_STARTUP = False
|
||||||
|
CLEANUP_AFTER_DOWNLOAD = True
|
||||||
|
|
||||||
|
app = create_app(CleanupAfterDownloadConfig)
|
||||||
|
client = app.test_client()
|
||||||
|
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
|
||||||
|
|
||||||
|
convert_response = client.post(
|
||||||
|
"/convert",
|
||||||
|
data={
|
||||||
|
"mode": "vendor",
|
||||||
|
"output_format": "csv",
|
||||||
|
"sort_by": "datetime",
|
||||||
|
"order": "asc",
|
||||||
|
"policy_cs": "",
|
||||||
|
"policy_ci": "",
|
||||||
|
"severity_cs": "",
|
||||||
|
"severity_ci": "",
|
||||||
|
"log_file": (log_file, "sample.log"),
|
||||||
|
},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
log_file.close()
|
||||||
|
|
||||||
|
html = convert_response.data.decode("utf-8")
|
||||||
|
marker = "/download/"
|
||||||
|
start = html.index(marker) + len(marker)
|
||||||
|
end = html.index('"', start)
|
||||||
|
result_id = html[start:end]
|
||||||
|
metadata_path = Path(app.config["OUTPUT_DIRECTORY"]) / f"{result_id}.json"
|
||||||
|
|
||||||
|
download_response = client.get(f"/download/{result_id}")
|
||||||
|
download_response.close()
|
||||||
|
convert_response.close()
|
||||||
|
|
||||||
|
assert not metadata_path.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_on_startup_removes_expired_outputs(tmp_path):
|
||||||
|
output_dir = tmp_path / "startup-cleanup-outputs"
|
||||||
|
output_dir.mkdir(parents=True)
|
||||||
|
result_id = "expired-result"
|
||||||
|
file_path = output_dir / f"{result_id}.csv"
|
||||||
|
metadata_path = output_dir / f"{result_id}.json"
|
||||||
|
file_path.write_text("header\nvalue\n", encoding="utf-8")
|
||||||
|
metadata_path.write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"result_id": result_id,
|
||||||
|
"file_path": str(file_path),
|
||||||
|
"download_name": "waf-report.csv",
|
||||||
|
"mimetype": "text/csv; charset=utf-8",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
old_timestamp = 946684800
|
||||||
|
file_path.touch()
|
||||||
|
metadata_path.touch()
|
||||||
|
Path(file_path).touch()
|
||||||
|
import os
|
||||||
|
|
||||||
|
os.utime(file_path, (old_timestamp, old_timestamp))
|
||||||
|
os.utime(metadata_path, (old_timestamp, old_timestamp))
|
||||||
|
|
||||||
|
class StartupCleanupConfig:
|
||||||
|
TESTING = True
|
||||||
|
SECRET_KEY = "test-secret"
|
||||||
|
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
|
||||||
|
PREVIEW_RECORD_LIMIT = 5
|
||||||
|
OUTPUT_DIRECTORY = output_dir
|
||||||
|
OUTPUT_RETENTION_HOURS = 1
|
||||||
|
CLEANUP_ON_STARTUP = True
|
||||||
|
CLEANUP_AFTER_DOWNLOAD = False
|
||||||
|
|
||||||
|
create_app(StartupCleanupConfig)
|
||||||
|
|
||||||
|
assert not file_path.exists()
|
||||||
|
assert not metadata_path.exists()
|
||||||
|
|
||||||
|
|
||||||
def test_default_upload_limit_is_100_mib(app):
|
def test_default_upload_limit_is_100_mib(app):
|
||||||
assert app.config["MAX_CONTENT_LENGTH"] == 100 * 1024 * 1024
|
assert app.config["MAX_CONTENT_LENGTH"] == 100 * 1024 * 1024
|
||||||
|
|
||||||
|
|||||||
@@ -74,3 +74,19 @@ def test_parse_log_file_rebuilds_record_after_embedded_newlines():
|
|||||||
assert records[0]["msg"] == "hellobroken-fragmentworld"
|
assert records[0]["msg"] == "hellobroken-fragmentworld"
|
||||||
assert records[0]["action"] == "Alert"
|
assert records[0]["action"] == "Alert"
|
||||||
assert records[1]["msg"] == "next"
|
assert records[1]["msg"] == "next"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_log_file_does_not_require_full_stream_read():
|
||||||
|
class NoFullReadBytesIO(io.BytesIO):
|
||||||
|
def read(self, size=-1):
|
||||||
|
if size == -1:
|
||||||
|
raise AssertionError("full stream read should not be used")
|
||||||
|
return super().read(size)
|
||||||
|
|
||||||
|
stream = NoFullReadBytesIO(
|
||||||
|
b'v015xxxxdate=2024-02-15 time=09:10:11 policy="Strict Policy" msg="blocked request"\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _union_keys = parse_log_file(stream)
|
||||||
|
|
||||||
|
assert records[0]["policy"] == "Strict Policy"
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ def test_filter_records_supports_case_insensitive_filters():
|
|||||||
mode="vendor",
|
mode="vendor",
|
||||||
)
|
)
|
||||||
|
|
||||||
filtered = filter_records(records, options)
|
filtered = list(filter_records(records, options))
|
||||||
|
|
||||||
assert filtered == [{"policy": "ProdPolicy", "severity_level": "HIGH"}]
|
assert filtered == [{"policy": "ProdPolicy", "severity_level": "HIGH"}]
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,75 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from app.services.storage import cleanup_expired_outputs, delete_result_files, persist_result
|
||||||
|
|
||||||
|
|
||||||
|
def test_persist_result_writes_csv_and_collects_preview(tmp_path: Path):
|
||||||
|
metadata, export_result = persist_result(
|
||||||
|
output_dir=tmp_path,
|
||||||
|
records=[
|
||||||
|
{
|
||||||
|
"v015xxxxdate": "2024-05-01",
|
||||||
|
"time": "10:00:00",
|
||||||
|
"policy": "Prod Policy",
|
||||||
|
"severity_level": "high",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"v015xxxxdate": "2024-05-02",
|
||||||
|
"time": "11:00:00",
|
||||||
|
"policy": "Other Policy",
|
||||||
|
"severity_level": "low",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
union_keys=["v015xxxxdate", "time", "policy", "severity_level"],
|
||||||
|
mode="full",
|
||||||
|
output_format="csv",
|
||||||
|
preview_record_limit=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
written = Path(metadata.file_path).read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
assert metadata.download_name == "waf-report.csv"
|
||||||
|
assert "v015xxxxdate,time,policy,severity_level" in written
|
||||||
|
assert "2024-05-01,10:00:00,Prod Policy,high" in written
|
||||||
|
assert export_result.preview(1).count("\n") == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_result_files_removes_output_and_metadata(tmp_path: Path):
|
||||||
|
result_id = "delete-me"
|
||||||
|
output_file = tmp_path / f"{result_id}.txt"
|
||||||
|
metadata_file = tmp_path / f"{result_id}.json"
|
||||||
|
output_file.write_text("content", encoding="utf-8")
|
||||||
|
metadata_file.write_text("{}", encoding="utf-8")
|
||||||
|
|
||||||
|
delete_result_files(output_dir=tmp_path, result_id=result_id)
|
||||||
|
|
||||||
|
assert not output_file.exists()
|
||||||
|
assert not metadata_file.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_expired_outputs_removes_only_old_results(tmp_path: Path):
|
||||||
|
old_result_id = "old-result"
|
||||||
|
new_result_id = "new-result"
|
||||||
|
old_output = tmp_path / f"{old_result_id}.csv"
|
||||||
|
old_metadata = tmp_path / f"{old_result_id}.json"
|
||||||
|
new_output = tmp_path / f"{new_result_id}.csv"
|
||||||
|
new_metadata = tmp_path / f"{new_result_id}.json"
|
||||||
|
|
||||||
|
old_output.write_text("old", encoding="utf-8")
|
||||||
|
new_output.write_text("new", encoding="utf-8")
|
||||||
|
old_metadata.write_text(json.dumps({"result_id": old_result_id, "file_path": str(old_output)}), encoding="utf-8")
|
||||||
|
new_metadata.write_text(json.dumps({"result_id": new_result_id, "file_path": str(new_output)}), encoding="utf-8")
|
||||||
|
|
||||||
|
old_timestamp = 946684800
|
||||||
|
os.utime(old_output, (old_timestamp, old_timestamp))
|
||||||
|
os.utime(old_metadata, (old_timestamp, old_timestamp))
|
||||||
|
|
||||||
|
deleted_results = cleanup_expired_outputs(output_dir=tmp_path, retention_hours=1)
|
||||||
|
|
||||||
|
assert deleted_results == 1
|
||||||
|
assert not old_output.exists()
|
||||||
|
assert not old_metadata.exists()
|
||||||
|
assert new_output.exists()
|
||||||
|
assert new_metadata.exists()
|
||||||
Reference in New Issue
Block a user