Compare commits

8 Commits

Author SHA1 Message Date
Alfredo Di Stasio b3c301e69e Merge branch 'feature/fix-invalid-datetime-sorting' into develop 2026-04-27 15:08:46 +02:00
Alfredo Di Stasio a2ab2674e3 Keep invalid datetimes at end of sort 2026-04-27 15:08:34 +02:00
Alfredo Di Stasio 3e370c25b6 Merge branch 'feature/harden-secret-key-config' into develop 2026-04-27 14:24:18 +02:00
Alfredo Di Stasio 41c63980f0 Harden secret key configuration 2026-04-27 14:23:13 +02:00
Alfredo Di Stasio 846a22c047 Merge branch 'feature/output-cleanup-policy' into develop 2026-04-27 14:18:03 +02:00
Alfredo Di Stasio b8069d6771 Add output cleanup policy 2026-04-27 14:17:44 +02:00
Alfredo Di Stasio 93cebeb002 Merge branch 'feature/reduce-memory-footprint' into develop 2026-04-27 12:42:38 +02:00
Alfredo Di Stasio f9f792f6a1 Reduce conversion memory footprint 2026-04-27 11:44:40 +02:00
16 changed files with 797 additions and 115 deletions
+37 -1
View File
@@ -42,6 +42,7 @@ pip install -e ".[dev]"
```bash
export FLASK_APP=wsgi.py
export APP_ENV=development
export MAX_UPLOAD_SIZE_MB=100
flask run --debug
```
@@ -73,7 +74,7 @@ docker build -t webfortilog .
### Run
```bash
docker run --rm -p 8000:8000 -e MAX_UPLOAD_SIZE_MB=100 webfortilog
docker run --rm -p 8000:8000 -e APP_ENV=development -e MAX_UPLOAD_SIZE_MB=100 webfortilog
```
Open `http://127.0.0.1:8000`.
@@ -89,8 +90,15 @@ docker compose up --build web
Compose settings are stored in `env`. Update that file to change values such as:
- `SECRET_KEY`
- `APP_ENV`
- `MAX_UPLOAD_SIZE_MB`
- `OUTPUT_DIRECTORY`
- `OUTPUT_RETENTION_HOURS`
- `CLEANUP_ON_STARTUP`
- `CLEANUP_AFTER_DOWNLOAD`
For local Docker Compose usage, `APP_ENV=development` allows an internal development-only fallback secret key.
For production-like environments, set a strong `SECRET_KEY` explicitly.
### Run the test suite in a container
@@ -126,8 +134,36 @@ curl -X POST http://127.0.0.1:5000/convert \
## Notes
- Temporary output files are written to `instance/outputs`
- Generated files are cleaned up according to the configured output retention policy
- The application does not require a database
- Gunicorn is used as the production WSGI server
- Parsing and export writing are streamed to reduce memory usage on large uploads
- Sorting still materializes the filtered record set because global ordering by datetime or severity requires the full filtered input
- Default upload limit is 100 MiB
- Set `MAX_UPLOAD_SIZE_MB` to configure the upload limit in megabytes
- `MAX_CONTENT_LENGTH` is also supported as a lower-level byte-based override
- `SECRET_KEY` is required in production-like environments and must not use placeholder values such as `change-me`
- Development-only fallback secret key behavior is enabled only when `APP_ENV=development` or `FLASK_ENV=development`
- `OUTPUT_RETENTION_HOURS` controls how long generated output files are kept
- `CLEANUP_ON_STARTUP=true` removes expired generated files when the app starts
- `CLEANUP_AFTER_DOWNLOAD=true` deletes a result only after the response finishes sending
## Secure configuration example
### Production-like environment
```bash
python3 - <<'PY'
import secrets
print(secrets.token_urlsafe(48))
PY
```
Use the generated value as `SECRET_KEY`, for example:
```bash
docker run --rm -p 8000:8000 \
-e SECRET_KEY='replace-with-a-long-random-secret' \
-e MAX_UPLOAD_SIZE_MB=100 \
webfortilog
```
+9 -1
View File
@@ -3,8 +3,9 @@ from pathlib import Path
from flask import Flask, flash, render_template
from werkzeug.exceptions import RequestEntityTooLarge
from app.config import Config
from app.config import Config, validate_secret_key
from app.routes import main_blueprint
from app.services.storage import cleanup_expired_outputs
def _format_size_limit(size_limit_bytes: int) -> str:
@@ -20,6 +21,7 @@ def create_app(config_class: type[Config] = Config) -> Flask:
"""Application factory used by Flask and Gunicorn."""
app = Flask(__name__, instance_relative_config=True)
app.config.from_object(config_class)
validate_secret_key(app.config["SECRET_KEY"])
output_dir = Path(app.config["OUTPUT_DIRECTORY"])
if not output_dir.is_absolute():
@@ -27,6 +29,12 @@ def create_app(config_class: type[Config] = Config) -> Flask:
app.config["OUTPUT_DIRECTORY"] = output_dir
output_dir.mkdir(parents=True, exist_ok=True)
if app.config.get("CLEANUP_ON_STARTUP", False):
cleanup_expired_outputs(
output_dir=output_dir,
retention_hours=app.config.get("OUTPUT_RETENTION_HOURS", 24),
)
app.register_blueprint(main_blueprint)
@app.errorhandler(RequestEntityTooLarge)
+57 -1
View File
@@ -1,6 +1,23 @@
import os
from pathlib import Path
DEVELOPMENT_SECRET_KEY = "dev-secret-key-change-me"
UNSAFE_SECRET_KEYS = {
"",
"change-me",
"dev-secret-key-change-me",
"secret",
"default",
}
def _get_bool_setting(name: str, default: bool) -> bool:
"""Parse conventional boolean environment values."""
value = os.environ.get(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def _get_max_content_length() -> int:
"""Resolve the upload size limit from environment settings."""
@@ -15,13 +32,52 @@ def _get_max_content_length() -> int:
return 100 * 1024 * 1024
def _get_app_env() -> str:
"""Resolve the effective application environment."""
return (
os.environ.get("APP_ENV")
or os.environ.get("FLASK_ENV")
or "production"
).strip().lower()
def _is_development_env() -> bool:
"""Return whether the app is explicitly running in development mode."""
return _get_app_env() == "development"
def _get_secret_key() -> str:
"""Resolve the secret key with a development-only fallback."""
secret_key = os.environ.get("SECRET_KEY", "").strip()
if secret_key:
return secret_key
if _is_development_env():
return DEVELOPMENT_SECRET_KEY
return ""
def validate_secret_key(secret_key: str) -> None:
"""Fail fast when a production-like environment uses an unsafe secret key."""
normalized = secret_key.strip()
if _is_development_env():
return
if normalized.lower() in UNSAFE_SECRET_KEYS:
raise RuntimeError(
"SECRET_KEY is missing or unsafe for a production-like environment. "
"Set SECRET_KEY to a long random value, or use APP_ENV=development only for local development."
)
class Config:
"""Default configuration for local and container usage."""
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-key-change-me")
SECRET_KEY = _get_secret_key()
# Default to 100 MiB so larger WAF exports can be processed without tuning.
MAX_CONTENT_LENGTH = _get_max_content_length()
PREVIEW_RECORD_LIMIT = int(os.environ.get("PREVIEW_RECORD_LIMIT", 5))
OUTPUT_DIRECTORY = Path(
os.environ.get("OUTPUT_DIRECTORY", Path("instance") / "outputs")
)
OUTPUT_RETENTION_HOURS = int(os.environ.get("OUTPUT_RETENTION_HOURS", 24))
CLEANUP_ON_STARTUP = _get_bool_setting("CLEANUP_ON_STARTUP", True)
CLEANUP_AFTER_DOWNLOAD = _get_bool_setting("CLEANUP_AFTER_DOWNLOAD", False)
+25 -22
View File
@@ -12,17 +12,13 @@ from flask import (
url_for,
)
from werkzeug.datastructures import FileStorage
from werkzeug.wsgi import ClosingIterator
from app.constants import MODES, OUTPUT_FORMATS, SORTABLE_FIELDS, SORT_ORDERS
from app.services.exporter import build_export
from app.services.parser import LogParseError, parse_log_file
from app.services.processing import (
ProcessingError,
ProcessingOptions,
filter_records,
sort_records,
)
from app.services.storage import load_result_metadata, persist_result
from app.services.conversion import convert_uploaded_log
from app.services.parser import LogParseError
from app.services.processing import ProcessingError, ProcessingOptions
from app.services.storage import delete_result_files, load_result_metadata
main_blueprint = Blueprint("main", __name__)
@@ -95,7 +91,6 @@ def convert():
assert uploaded_file is not None
try:
records, union_keys = parse_log_file(uploaded_file.stream)
options = ProcessingOptions(
policy_cs=form.policy_cs,
policy_ci=form.policy_ci,
@@ -105,12 +100,12 @@ def convert():
order=form.order,
mode=form.mode,
)
filtered_records = filter_records(records, options)
sorted_records = sort_records(filtered_records, options)
export_result = build_export(sorted_records, union_keys, form.mode, form.output_format)
metadata = persist_result(
conversion_result = convert_uploaded_log(
stream=uploaded_file.stream,
options=options,
output_dir=current_app.config["OUTPUT_DIRECTORY"],
export_result=export_result,
output_format=form.output_format,
preview_record_limit=current_app.config["PREVIEW_RECORD_LIMIT"],
)
except (LogParseError, ProcessingError) as exc:
flash(str(exc), "danger")
@@ -122,15 +117,16 @@ def convert():
)
return render_template("index.html", form=form), 400
preview_limit = current_app.config["PREVIEW_RECORD_LIMIT"]
return render_template(
"result.html",
result_id=metadata.result_id,
preview_text=export_result.preview(preview_limit),
result_id=conversion_result.metadata.result_id,
preview_text=conversion_result.export_result.preview(
current_app.config["PREVIEW_RECORD_LIMIT"]
),
output_format=form.output_format,
record_count=len(sorted_records),
parsed_count=len(records),
filtered_count=len(sorted_records),
record_count=conversion_result.filtered_count,
parsed_count=conversion_result.parsed_count,
filtered_count=conversion_result.filtered_count,
mode=form.mode,
sort_by=form.sort_by,
order=form.order,
@@ -144,10 +140,17 @@ def download(result_id: str):
flash("Requested output file could not be found.", "danger")
return redirect(url_for("main.index"))
return send_file(
response = send_file(
Path(metadata["file_path"]),
as_attachment=True,
download_name=metadata["download_name"],
mimetype=metadata["mimetype"],
max_age=0,
)
if current_app.config.get("CLEANUP_AFTER_DOWNLOAD", False):
output_dir = current_app.config["OUTPUT_DIRECTORY"]
response.response = ClosingIterator(
response.response,
[lambda: delete_result_files(output_dir=output_dir, result_id=result_id)],
)
return response
+47
View File
@@ -0,0 +1,47 @@
from dataclasses import dataclass
from pathlib import Path
from app.services.exporter import ExportResult
from app.services.parser import create_parse_session
from app.services.processing import ProcessingOptions, filter_records, sort_records
from app.services.storage import ResultMetadata, persist_result
@dataclass(slots=True)
class ConversionResult:
metadata: ResultMetadata
export_result: ExportResult
parsed_count: int
filtered_count: int
def convert_uploaded_log(
stream,
options: ProcessingOptions,
output_dir: Path,
output_format: str,
preview_record_limit: int,
) -> ConversionResult:
"""Convert an uploaded log into a persisted export with a small in-memory preview.
Parsing, filtering, and export writing are streamed to keep memory usage low.
Sorting still materializes the filtered records because global ordering by datetime
or severity requires seeing the whole filtered result set first.
"""
parse_session = create_parse_session(stream)
sorted_records = sort_records(filter_records(parse_session.iter_records(), options), options)
metadata, export_result = persist_result(
output_dir=output_dir,
records=sorted_records,
union_keys=parse_session.union_keys(),
mode=options.mode,
output_format=output_format,
preview_record_limit=preview_record_limit,
)
return ConversionResult(
metadata=metadata,
export_result=export_result,
parsed_count=parse_session.parsed_count,
filtered_count=len(sorted_records),
)
+69 -31
View File
@@ -1,69 +1,107 @@
import csv
import io
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence, TextIO
from app.constants import VENDOR_FIELDS
@dataclass(slots=True)
class ExportResult:
content: str
columns: list[str]
output_format: str
preview_text: str
def preview(self, record_limit: int) -> str:
"""Build a small preview string for the result page."""
if self.output_format == "text":
marker = f"--- record {record_limit + 1} ---"
if marker in self.content:
return self.content.split(marker, 1)[0].rstrip()
return self.content
lines = self.content.splitlines()
if len(lines) <= record_limit + 1:
return self.content
return "\n".join(lines[: record_limit + 1])
def preview(self, _record_limit: int) -> str:
"""Return the preview that was collected during export writing."""
return self.preview_text
def build_export(
records: list[dict[str, str]],
def write_export(
file_path: Path,
records: Sequence[dict[str, str]],
union_keys: list[str],
mode: str,
output_format: str,
preview_record_limit: int,
) -> ExportResult:
"""Write the final export directly to disk and keep only a small preview in memory."""
columns = VENDOR_FIELDS if mode == "vendor" else union_keys
with file_path.open("w", encoding="utf-8", newline="") as export_file:
if output_format == "text":
return ExportResult(
content=_render_text(records, columns),
preview_text = _write_text(
export_file=export_file,
records=records,
columns=columns,
output_format=output_format,
preview_record_limit=preview_record_limit,
)
else:
preview_text = _write_csv(
export_file=export_file,
records=records,
columns=columns,
preview_record_limit=preview_record_limit,
)
return ExportResult(
content=_render_csv(records, columns),
columns=columns,
output_format=output_format,
preview_text=preview_text,
)
def _render_text(records: list[dict[str, str]], columns: list[str]) -> str:
def _write_text(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
max_key_length = max((len(column) for column in columns), default=0)
chunks: list[str] = []
preview_lines: list[str] = []
wrote_line = False
for index, record in enumerate(records, start=1):
chunks.append(f"--- record {index} ---")
header = f"--- record {index} ---"
wrote_line = _write_line(export_file, header, wrote_line)
if index <= preview_record_limit:
preview_lines.append(header)
for column in columns:
value = record.get(column, "")
chunks.append(f" {column.ljust(max_key_length)} = {value}")
line = f" {column.ljust(max_key_length)} = {record.get(column, '')}"
wrote_line = _write_line(export_file, line, wrote_line)
if index <= preview_record_limit:
preview_lines.append(line)
return "\n".join(chunks)
return "\n".join(preview_lines)
def _render_csv(records: list[dict[str, str]], columns: list[str]) -> str:
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=columns, extrasaction="ignore")
def _write_csv(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore")
writer.writeheader()
for record in records:
writer.writerow({column: record.get(column, "") for column in columns})
return buffer.getvalue()
preview_buffer = io.StringIO()
preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore")
preview_writer.writeheader()
for index, record in enumerate(records, start=1):
row = {column: record.get(column, "") for column in columns}
writer.writerow(row)
if index <= preview_record_limit:
preview_writer.writerow(row)
return preview_buffer.getvalue().rstrip("\n")
def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool:
"""Write lines without leaving a trailing newline at the end of the file."""
if wrote_line:
export_file.write("\n")
export_file.write(line)
return True
+111 -32
View File
@@ -1,4 +1,6 @@
import codecs
from collections import OrderedDict
from dataclasses import dataclass, field
from io import BufferedIOBase, TextIOBase
import re
@@ -10,18 +12,35 @@ class LogParseError(ValueError):
"""Raised when the uploaded log file cannot be parsed."""
def _decode_log_content(raw_bytes: bytes | str) -> str:
"""Decode uploaded log content using practical text encodings seen in exports."""
if isinstance(raw_bytes, str):
return raw_bytes
@dataclass(slots=True)
class ParseSession:
"""Stateful streamed parser for uploaded log files."""
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
try:
return raw_bytes.decode(encoding)
except UnicodeDecodeError:
continue
stream: BufferedIOBase | TextIOBase
encoding: str | None
_union_keys: OrderedDict[str, None] = field(default_factory=OrderedDict)
parsed_count: int = 0
_consumed: bool = False
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
def iter_records(self):
if self._consumed:
raise RuntimeError("ParseSession records can only be consumed once.")
self._consumed = True
for line_number, line in _iter_logical_records(_iter_physical_lines(self.stream, self.encoding)):
record = _parse_record(line, line_number)
for key in record:
self._union_keys.setdefault(key, None)
self.parsed_count += 1
yield record
def union_keys(self) -> list[str]:
return list(self._union_keys.keys())
def create_parse_session(stream: BufferedIOBase | TextIOBase) -> ParseSession:
"""Prepare a streamed parser session without materializing the full upload in memory."""
return ParseSession(stream=stream, encoding=_resolve_stream_encoding(stream))
def _normalize_value(value: str) -> str:
@@ -34,6 +53,80 @@ def _normalize_value(value: str) -> str:
return value
def _resolve_stream_encoding(stream: BufferedIOBase | TextIOBase) -> str | None:
"""Detect the most suitable stream encoding without reading the full file into memory."""
probe = stream.read(0)
if isinstance(probe, str):
return None
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
try:
_validate_stream_encoding(stream, encoding)
return encoding
except UnicodeDecodeError:
continue
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
def _validate_stream_encoding(stream: BufferedIOBase | TextIOBase, encoding: str) -> None:
"""Scan the stream to verify that the candidate encoding can decode it fully."""
_rewind_stream(stream)
decoder = codecs.getincrementaldecoder(encoding)()
for chunk in iter(lambda: stream.read(64 * 1024), b""):
decoder.decode(chunk, final=False)
decoder.decode(b"", final=True)
_rewind_stream(stream)
def _iter_physical_lines(
stream: BufferedIOBase | TextIOBase,
encoding: str | None,
):
"""Yield decoded physical lines from the uploaded stream without full-file buffering."""
_rewind_stream(stream)
if encoding is None:
for line_number, raw_line in enumerate(stream, start=1):
yield line_number, raw_line
return
line_number = 1
decoder = codecs.getincrementaldecoder(encoding)()
pending = ""
for chunk in iter(lambda: stream.read(64 * 1024), b""):
text = decoder.decode(chunk, final=False)
pending += text
while True:
newline_index = pending.find("\n")
if newline_index == -1:
break
line = pending[: newline_index + 1]
pending = pending[newline_index + 1 :]
yield line_number, line
line_number += 1
pending += decoder.decode(b"", final=True)
while True:
newline_index = pending.find("\n")
if newline_index == -1:
break
line = pending[: newline_index + 1]
pending = pending[newline_index + 1 :]
yield line_number, line
line_number += 1
if pending:
yield line_number, pending
def _rewind_stream(stream: BufferedIOBase | TextIOBase) -> None:
"""Move the uploaded stream back to the start."""
if not hasattr(stream, "seek"):
raise LogParseError("The uploaded file stream is not seekable.")
stream.seek(0)
def _parse_record(line: str, line_number: int) -> dict[str, str]:
"""Parse a logical record by locating `key=` boundaries instead of splitting on spaces."""
matches = list(KEY_PATTERN.finditer(line))
@@ -58,20 +151,19 @@ def _parse_record(line: str, line_number: int) -> dict[str, str]:
return record
def _iter_logical_records(content: str) -> list[tuple[int, str]]:
def _iter_logical_records(physical_lines):
"""Rebuild logical records when embedded newlines split a single log entry."""
records: list[tuple[int, str]] = []
current_record: list[str] = []
current_start_line: int | None = None
for line_number, raw_line in enumerate(content.splitlines(), start=1):
for line_number, raw_line in physical_lines:
line = raw_line.strip()
if not line:
continue
if line.startswith(RECORD_PREFIX):
if current_record and current_start_line is not None:
records.append((current_start_line, "".join(current_record)))
yield current_start_line, "".join(current_record)
current_record = [line]
current_start_line = line_number
continue
@@ -85,24 +177,11 @@ def _iter_logical_records(content: str) -> list[tuple[int, str]]:
)
if current_record and current_start_line is not None:
records.append((current_start_line, "".join(current_record)))
return records
yield current_start_line, "".join(current_record)
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
"""Parse a text log file where each line contains shell-like key/value tokens."""
raw_bytes = stream.read()
content = _decode_log_content(raw_bytes)
records: list[dict[str, str]] = []
seen_keys: OrderedDict[str, None] = OrderedDict()
for line_number, line in _iter_logical_records(content):
record = _parse_record(line, line_number)
for key in record:
seen_keys.setdefault(key, None)
records.append(record)
return records, list(seen_keys.keys())
"""Compatibility helper that still materializes all parsed records when needed."""
session = create_parse_session(stream)
records = list(session.iter_records())
return records, session.union_keys()
+29 -15
View File
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable
from app.constants import SEVERITY_RANKING
@@ -20,11 +21,9 @@ class ProcessingOptions:
def filter_records(
records: list[dict[str, str]], options: ProcessingOptions
) -> list[dict[str, str]]:
"""Apply user-selected filters to parsed records."""
filtered: list[dict[str, str]] = []
records: Iterable[dict[str, str]], options: ProcessingOptions
) -> Iterable[dict[str, str]]:
"""Apply user-selected filters lazily to parsed records."""
for record in records:
policy_value = record.get("policy", "")
severity_value = record.get("severity_level", "")
@@ -38,19 +37,17 @@ def filter_records(
if options.severity_ci and options.severity_ci.lower() not in severity_value.lower():
continue
filtered.append(record)
return filtered
yield record
def sort_records(
records: list[dict[str, str]], options: ProcessingOptions
records: Iterable[dict[str, str]], options: ProcessingOptions
) -> list[dict[str, str]]:
"""Sort records by datetime or severity using the requested order."""
reverse = options.order == "desc"
if options.sort_by == "datetime":
key_func = _datetime_key
return _sort_records_by_datetime(records, reverse)
elif options.sort_by == "severity":
key_func = _severity_key
else:
@@ -59,17 +56,34 @@ def sort_records(
return sorted(records, key=key_func, reverse=reverse)
def _datetime_key(record: dict[str, str]) -> tuple[int, datetime]:
def _sort_records_by_datetime(
records: Iterable[dict[str, str]], reverse: bool
) -> list[dict[str, str]]:
"""Sort valid datetimes normally and always place invalid/missing values last."""
valid_records: list[tuple[datetime, dict[str, str]]] = []
invalid_records: list[dict[str, str]] = []
for record in records:
parsed_datetime = _parse_datetime(record)
if parsed_datetime is None:
invalid_records.append(record)
continue
valid_records.append((parsed_datetime, record))
sorted_valid_records = sorted(valid_records, key=lambda item: item[0], reverse=reverse)
return [record for _parsed, record in sorted_valid_records] + invalid_records
def _parse_datetime(record: dict[str, str]) -> datetime | None:
date_value = record.get("v015xxxxdate", "").strip()
time_value = record.get("time", "").strip()
if not date_value or not time_value:
return (1, datetime.max)
return None
try:
parsed = datetime.strptime(f"{date_value} {time_value}", "%Y-%m-%d %H:%M:%S")
return datetime.strptime(f"{date_value} {time_value}", "%Y-%m-%d %H:%M:%S")
except ValueError:
return (1, datetime.max)
return (0, parsed)
return None
def _severity_key(record: dict[str, str]) -> tuple[int, str]:
+72 -6
View File
@@ -1,9 +1,10 @@
import json
import uuid
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from app.services.exporter import ExportResult
from app.services.exporter import ExportResult, write_export
@dataclass(slots=True)
@@ -14,16 +15,36 @@ class ResultMetadata:
mimetype: str
def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetadata:
def _result_paths(output_dir: Path, result_id: str) -> tuple[Path, Path]:
"""Build the sidecar metadata and output file search pattern for a result id."""
metadata_path = output_dir / f"{result_id}.json"
return metadata_path, output_dir / f"{result_id}"
def persist_result(
output_dir: Path,
records: list[dict[str, str]],
union_keys: list[str],
mode: str,
output_format: str,
preview_record_limit: int,
) -> tuple[ResultMetadata, ExportResult]:
"""Persist generated output and sidecar metadata in a temporary directory."""
result_id = uuid.uuid4().hex
extension = "txt" if export_result.output_format == "text" else "csv"
extension = "txt" if output_format == "text" else "csv"
mimetype = "text/plain; charset=utf-8" if extension == "txt" else "text/csv; charset=utf-8"
file_path = output_dir / f"{result_id}.{extension}"
metadata_path = output_dir / f"{result_id}.json"
file_path.write_text(export_result.content, encoding="utf-8")
export_result = write_export(
file_path=file_path,
records=records,
union_keys=union_keys,
mode=mode,
output_format=output_format,
preview_record_limit=preview_record_limit,
)
metadata = ResultMetadata(
result_id=result_id,
file_path=str(file_path),
@@ -31,13 +52,58 @@ def persist_result(output_dir: Path, export_result: ExportResult) -> ResultMetad
mimetype=mimetype,
)
metadata_path.write_text(json.dumps(asdict(metadata)), encoding="utf-8")
return metadata
return metadata, export_result
def load_result_metadata(output_dir: Path, result_id: str) -> dict[str, str] | None:
"""Load sidecar metadata for a generated file."""
metadata_path = output_dir / f"{result_id}.json"
metadata_path, _base_path = _result_paths(output_dir, result_id)
if not metadata_path.exists():
return None
return json.loads(metadata_path.read_text(encoding="utf-8"))
def delete_result_files(output_dir: Path, result_id: str) -> None:
"""Delete a generated output file and its metadata sidecar if they still exist."""
metadata_path, base_path = _result_paths(output_dir, result_id)
for output_file in output_dir.glob(f"{base_path.name}.*"):
if output_file.name == metadata_path.name:
continue
output_file.unlink(missing_ok=True)
metadata_path.unlink(missing_ok=True)
def cleanup_expired_outputs(output_dir: Path, retention_hours: int) -> int:
"""Delete generated output sets older than the configured retention window."""
cutoff = datetime.now(timezone.utc) - timedelta(hours=retention_hours)
deleted_results = 0
for metadata_path in output_dir.glob("*.json"):
try:
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
payload = {}
result_id = payload.get("result_id") or metadata_path.stem
file_path = Path(payload["file_path"]) if "file_path" in payload else None
newest_mtime = _newest_mtime(metadata_path, file_path)
if newest_mtime is None or newest_mtime >= cutoff:
continue
delete_result_files(output_dir=output_dir, result_id=result_id)
deleted_results += 1
return deleted_results
def _newest_mtime(metadata_path: Path, file_path: Path | None) -> datetime | None:
"""Return the newest modification time across the metadata and output file."""
mtimes: list[datetime] = []
if metadata_path.exists():
mtimes.append(datetime.fromtimestamp(metadata_path.stat().st_mtime, tz=timezone.utc))
if file_path is not None and file_path.exists():
mtimes.append(datetime.fromtimestamp(file_path.stat().st_mtime, tz=timezone.utc))
if not mtimes:
return None
return max(mtimes)
+4 -1
View File
@@ -1,3 +1,6 @@
SECRET_KEY=change-me
APP_ENV=development
MAX_UPLOAD_SIZE_MB=120
OUTPUT_DIRECTORY=/app/instance/outputs
OUTPUT_RETENTION_HOURS=24
CLEANUP_ON_STARTUP=true
CLEANUP_AFTER_DOWNLOAD=false
+3
View File
@@ -12,6 +12,9 @@ class TestConfig:
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = "test-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
@pytest.fixture()
+91
View File
@@ -1,4 +1,6 @@
import io
import json
from pathlib import Path
from app import create_app
@@ -132,6 +134,95 @@ def test_download_route_returns_generated_file(client):
download_response.close()
def test_download_route_can_cleanup_files_after_download(tmp_path):
class CleanupAfterDownloadConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "download-cleanup-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = True
app = create_app(CleanupAfterDownloadConfig)
client = app.test_client()
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
convert_response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
html = convert_response.data.decode("utf-8")
marker = "/download/"
start = html.index(marker) + len(marker)
end = html.index('"', start)
result_id = html[start:end]
metadata_path = Path(app.config["OUTPUT_DIRECTORY"]) / f"{result_id}.json"
download_response = client.get(f"/download/{result_id}")
download_response.close()
convert_response.close()
assert not metadata_path.exists()
def test_cleanup_on_startup_removes_expired_outputs(tmp_path):
output_dir = tmp_path / "startup-cleanup-outputs"
output_dir.mkdir(parents=True)
result_id = "expired-result"
file_path = output_dir / f"{result_id}.csv"
metadata_path = output_dir / f"{result_id}.json"
file_path.write_text("header\nvalue\n", encoding="utf-8")
metadata_path.write_text(
json.dumps(
{
"result_id": result_id,
"file_path": str(file_path),
"download_name": "waf-report.csv",
"mimetype": "text/csv; charset=utf-8",
}
),
encoding="utf-8",
)
old_timestamp = 946684800
file_path.touch()
metadata_path.touch()
Path(file_path).touch()
import os
os.utime(file_path, (old_timestamp, old_timestamp))
os.utime(metadata_path, (old_timestamp, old_timestamp))
class StartupCleanupConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = output_dir
OUTPUT_RETENTION_HOURS = 1
CLEANUP_ON_STARTUP = True
CLEANUP_AFTER_DOWNLOAD = False
create_app(StartupCleanupConfig)
assert not file_path.exists()
assert not metadata_path.exists()
def test_default_upload_limit_is_100_mib(app):
assert app.config["MAX_CONTENT_LENGTH"] == 100 * 1024 * 1024
+86 -1
View File
@@ -1,4 +1,12 @@
from app.config import _get_max_content_length
import pytest
from app import create_app
from app.config import (
DEVELOPMENT_SECRET_KEY,
_get_max_content_length,
_get_secret_key,
validate_secret_key,
)
def test_max_upload_size_mb_environment_variable(monkeypatch):
@@ -13,3 +21,80 @@ def test_max_content_length_environment_variable_is_supported(monkeypatch):
monkeypatch.setenv("MAX_CONTENT_LENGTH", "2048")
assert _get_max_content_length() == 2048
def test_secret_key_uses_development_fallback(monkeypatch):
monkeypatch.setenv("APP_ENV", "development")
monkeypatch.delenv("FLASK_ENV", raising=False)
monkeypatch.delenv("SECRET_KEY", raising=False)
assert _get_secret_key() == DEVELOPMENT_SECRET_KEY
def test_secret_key_is_required_outside_development(monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
monkeypatch.delenv("SECRET_KEY", raising=False)
assert _get_secret_key() == ""
def test_validate_secret_key_rejects_unsafe_value_outside_development(monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
with pytest.raises(RuntimeError, match="SECRET_KEY is missing or unsafe"):
validate_secret_key("change-me")
def test_create_app_allows_development_without_explicit_secret_key(tmp_path, monkeypatch):
monkeypatch.setenv("APP_ENV", "development")
monkeypatch.delenv("FLASK_ENV", raising=False)
monkeypatch.delenv("SECRET_KEY", raising=False)
class DevelopmentConfig:
SECRET_KEY = DEVELOPMENT_SECRET_KEY
MAX_CONTENT_LENGTH = 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "dev-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
app = create_app(DevelopmentConfig)
assert app.config["SECRET_KEY"] == DEVELOPMENT_SECRET_KEY
def test_create_app_rejects_unsafe_secret_key_outside_development(tmp_path, monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
class ProductionConfig:
SECRET_KEY = "change-me"
MAX_CONTENT_LENGTH = 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "prod-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
with pytest.raises(RuntimeError, match="SECRET_KEY is missing or unsafe"):
create_app(ProductionConfig)
def test_create_app_rejects_missing_secret_key_outside_development(tmp_path, monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
class ProductionConfig:
SECRET_KEY = ""
MAX_CONTENT_LENGTH = 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "prod-outputs-missing-key"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
with pytest.raises(RuntimeError, match="SECRET_KEY is missing or unsafe"):
create_app(ProductionConfig)
+16
View File
@@ -74,3 +74,19 @@ def test_parse_log_file_rebuilds_record_after_embedded_newlines():
assert records[0]["msg"] == "hellobroken-fragmentworld"
assert records[0]["action"] == "Alert"
assert records[1]["msg"] == "next"
def test_parse_log_file_does_not_require_full_stream_read():
class NoFullReadBytesIO(io.BytesIO):
def read(self, size=-1):
if size == -1:
raise AssertionError("full stream read should not be used")
return super().read(size)
stream = NoFullReadBytesIO(
b'v015xxxxdate=2024-02-15 time=09:10:11 policy="Strict Policy" msg="blocked request"\n'
)
records, _union_keys = parse_log_file(stream)
assert records[0]["policy"] == "Strict Policy"
+63 -1
View File
@@ -16,7 +16,7 @@ def test_filter_records_supports_case_insensitive_filters():
mode="vendor",
)
filtered = filter_records(records, options)
filtered = list(filter_records(records, options))
assert filtered == [{"policy": "ProdPolicy", "severity_level": "HIGH"}]
@@ -44,3 +44,65 @@ def test_sort_records_by_severity_desc_uses_defined_ranking():
"medium",
"info",
]
def test_sort_records_by_datetime_asc_places_invalid_records_last():
records = [
{"v015xxxxdate": "2024-05-03", "time": "08:00:00", "msg": "latest-valid"},
{"v015xxxxdate": "", "time": "09:00:00", "msg": "missing-date"},
{"v015xxxxdate": "2024-05-01", "time": "10:00:00", "msg": "earliest-valid"},
{"v015xxxxdate": "2024-05-02", "time": "", "msg": "missing-time"},
{"v015xxxxdate": "bad-date", "time": "99:99:99", "msg": "invalid-datetime"},
{"v015xxxxdate": "2024-05-02", "time": "09:30:00", "msg": "middle-valid"},
]
options = ProcessingOptions(
policy_cs="",
policy_ci="",
severity_cs="",
severity_ci="",
sort_by="datetime",
order="asc",
mode="vendor",
)
sorted_records = sort_records(records, options)
assert [record["msg"] for record in sorted_records] == [
"earliest-valid",
"middle-valid",
"latest-valid",
"missing-date",
"missing-time",
"invalid-datetime",
]
def test_sort_records_by_datetime_desc_places_invalid_records_last():
records = [
{"v015xxxxdate": "2024-05-03", "time": "08:00:00", "msg": "latest-valid"},
{"v015xxxxdate": "", "time": "09:00:00", "msg": "missing-date"},
{"v015xxxxdate": "2024-05-01", "time": "10:00:00", "msg": "earliest-valid"},
{"v015xxxxdate": "2024-05-02", "time": "", "msg": "missing-time"},
{"v015xxxxdate": "bad-date", "time": "99:99:99", "msg": "invalid-datetime"},
{"v015xxxxdate": "2024-05-02", "time": "09:30:00", "msg": "middle-valid"},
]
options = ProcessingOptions(
policy_cs="",
policy_ci="",
severity_cs="",
severity_ci="",
sort_by="datetime",
order="desc",
mode="vendor",
)
sorted_records = sort_records(records, options)
assert [record["msg"] for record in sorted_records] == [
"latest-valid",
"middle-valid",
"earliest-valid",
"missing-date",
"missing-time",
"invalid-datetime",
]
+75
View File
@@ -0,0 +1,75 @@
import json
import os
from pathlib import Path
from app.services.storage import cleanup_expired_outputs, delete_result_files, persist_result
def test_persist_result_writes_csv_and_collects_preview(tmp_path: Path):
metadata, export_result = persist_result(
output_dir=tmp_path,
records=[
{
"v015xxxxdate": "2024-05-01",
"time": "10:00:00",
"policy": "Prod Policy",
"severity_level": "high",
},
{
"v015xxxxdate": "2024-05-02",
"time": "11:00:00",
"policy": "Other Policy",
"severity_level": "low",
},
],
union_keys=["v015xxxxdate", "time", "policy", "severity_level"],
mode="full",
output_format="csv",
preview_record_limit=1,
)
written = Path(metadata.file_path).read_text(encoding="utf-8")
assert metadata.download_name == "waf-report.csv"
assert "v015xxxxdate,time,policy,severity_level" in written
assert "2024-05-01,10:00:00,Prod Policy,high" in written
assert export_result.preview(1).count("\n") == 1
def test_delete_result_files_removes_output_and_metadata(tmp_path: Path):
result_id = "delete-me"
output_file = tmp_path / f"{result_id}.txt"
metadata_file = tmp_path / f"{result_id}.json"
output_file.write_text("content", encoding="utf-8")
metadata_file.write_text("{}", encoding="utf-8")
delete_result_files(output_dir=tmp_path, result_id=result_id)
assert not output_file.exists()
assert not metadata_file.exists()
def test_cleanup_expired_outputs_removes_only_old_results(tmp_path: Path):
old_result_id = "old-result"
new_result_id = "new-result"
old_output = tmp_path / f"{old_result_id}.csv"
old_metadata = tmp_path / f"{old_result_id}.json"
new_output = tmp_path / f"{new_result_id}.csv"
new_metadata = tmp_path / f"{new_result_id}.json"
old_output.write_text("old", encoding="utf-8")
new_output.write_text("new", encoding="utf-8")
old_metadata.write_text(json.dumps({"result_id": old_result_id, "file_path": str(old_output)}), encoding="utf-8")
new_metadata.write_text(json.dumps({"result_id": new_result_id, "file_path": str(new_output)}), encoding="utf-8")
old_timestamp = 946684800
os.utime(old_output, (old_timestamp, old_timestamp))
os.utime(old_metadata, (old_timestamp, old_timestamp))
deleted_results = cleanup_expired_outputs(output_dir=tmp_path, retention_hours=1)
assert deleted_results == 1
assert not old_output.exists()
assert not old_metadata.exists()
assert new_output.exists()
assert new_metadata.exists()