Compare commits

14 Commits
dev ... develop

Author SHA1 Message Date
Alfredo Di Stasio
b3c301e69e Merge branch 'feature/fix-invalid-datetime-sorting' into develop 2026-04-27 15:08:46 +02:00
Alfredo Di Stasio
a2ab2674e3 Keep invalid datetimes at end of sort 2026-04-27 15:08:34 +02:00
Alfredo Di Stasio
3e370c25b6 Merge branch 'feature/harden-secret-key-config' into develop 2026-04-27 14:24:18 +02:00
Alfredo Di Stasio
41c63980f0 Harden secret key configuration 2026-04-27 14:23:13 +02:00
Alfredo Di Stasio
846a22c047 Merge branch 'feature/output-cleanup-policy' into develop 2026-04-27 14:18:03 +02:00
Alfredo Di Stasio
b8069d6771 Add output cleanup policy 2026-04-27 14:17:44 +02:00
Alfredo Di Stasio
93cebeb002 Merge branch 'feature/reduce-memory-footprint' into develop 2026-04-27 12:42:38 +02:00
Alfredo Di Stasio
f9f792f6a1 Reduce conversion memory footprint 2026-04-27 11:44:40 +02:00
Alfredo Di Stasio
9313b54abb Move compose settings to env file 2026-04-27 09:48:55 +02:00
Alfredo Di Stasio
15240aee59 Increase compose upload limit 2026-04-24 15:14:01 +02:00
Alfredo Di Stasio
235aa47dd3 Harden parser for malformed multiline records 2026-04-24 15:12:51 +02:00
Alfredo Di Stasio
f64deb9c0d Improve log upload handling 2026-04-24 15:00:43 +02:00
Alfredo Di Stasio
e793b51e4f Merge branch 'feature/flask-waf-log-converter' into develop 2026-04-24 14:41:00 +02:00
Alfredo Di Stasio
355d61f11f Build Flask WAF log converter app 2026-04-24 14:40:32 +02:00
27 changed files with 1984 additions and 1 deletions

6
.dockerignore Normal file
View File

@@ -0,0 +1,6 @@
.git
.pytest_cache
__pycache__
*.pyc
instance
.venv

7
.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
__pycache__/
.pytest_cache/
*.pyc
*.pyo
*.egg-info/
.venv/
instance/

37
Dockerfile Normal file
View File

@@ -0,0 +1,37 @@
FROM python:3.12-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
COPY pyproject.toml README.md ./
COPY app ./app
COPY tests ./tests
COPY wsgi.py ./
RUN useradd --create-home appuser && \
mkdir -p /app/instance/outputs && \
chown -R appuser:appuser /app
FROM base AS production
ENV OUTPUT_DIRECTORY=/app/instance/outputs
RUN pip install --no-cache-dir .
USER appuser
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "2", "--threads", "4", "wsgi:app"]
FROM base AS test
ENV OUTPUT_DIRECTORY=/app/instance/outputs
RUN pip install --no-cache-dir ".[dev]"
USER appuser
CMD ["python", "-m", "pytest"]

168
README.md
View File

@@ -1,3 +1,169 @@
# webfortilog # webfortilog
Flask based application to convert FortiWeb logs Flask-based web application that converts WAF log files into aligned text reports or CSV exports.
## Features
- Upload a UTF-8 log file where each line is a single record
- Parse shell-style `key=value` and `key="value with spaces"` tokens
- Support `vendor` mode with fixed columns and `full` mode with dynamic columns
- Filter by policy and severity with case-sensitive or case-insensitive partial matching
- Sort by combined datetime or severity ranking
- Preview results in the browser and download the generated file
- Run locally with Flask or in Docker with Gunicorn
## Project structure
```text
app/
services/
templates/
tests/
Dockerfile
pyproject.toml
wsgi.py
```
## Local usage
### Requirements
- Python 3.12
### Install
```bash
python3.12 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
```
### Run Container
```bash
export FLASK_APP=wsgi.py
export APP_ENV=development
export MAX_UPLOAD_SIZE_MB=100
flask run --debug
```
Open `http://127.0.0.1:5000`.
### Example input file
If you have a local WAF export such as `attack_download.log`, you can use it as a real example upload.
- Example file: `attack_download.log`
- Approximate size in the current workspace: `98.5 MiB`
- The default `MAX_UPLOAD_SIZE_MB=100` setting is sized to accept a file of that size
### Test
```bash
pytest
```
## Docker usage
### Build
```bash
docker build -t webfortilog .
```
### Run
```bash
docker run --rm -p 8000:8000 -e APP_ENV=development -e MAX_UPLOAD_SIZE_MB=100 webfortilog
```
Open `http://127.0.0.1:8000`.
## Docker Compose usage
### Start the web app
```bash
docker compose up --build web
```
Compose settings are stored in `env`. Update that file to change values such as:
- `SECRET_KEY`
- `APP_ENV`
- `MAX_UPLOAD_SIZE_MB`
- `OUTPUT_DIRECTORY`
- `OUTPUT_RETENTION_HOURS`
- `CLEANUP_ON_STARTUP`
- `CLEANUP_AFTER_DOWNLOAD`
For local Docker Compose usage, `APP_ENV=development` allows an internal development-only fallback secret key.
For production-like environments, set a strong `SECRET_KEY` explicitly.
### Run the test suite in a container
```bash
docker compose run --rm test
```
## Example usage
### Browser upload
1. Start the app with `flask run --debug` or `docker compose up --build web`
2. Open the web UI
3. Upload `attack_download.log`
4. Try `vendor` mode with `text` output for a readable preview
5. Try `full` mode with `csv` output for complete export coverage
### Command-line upload example
```bash
curl -X POST http://127.0.0.1:5000/convert \
-F "log_file=@attack_download.log" \
-F "mode=vendor" \
-F "output_format=text" \
-F "sort_by=datetime" \
-F "order=asc" \
-F "policy_cs=" \
-F "policy_ci=" \
-F "severity_cs=" \
-F "severity_ci="
```
## Notes
- Temporary output files are written to `instance/outputs`
- Generated files are cleaned up according to the configured output retention policy
- The application does not require a database
- Gunicorn is used as the production WSGI server
- Parsing and export writing are streamed to reduce memory usage on large uploads
- Sorting still materializes the filtered record set because global ordering by datetime or severity requires the full filtered input
- Default upload limit is 100 MiB
- Set `MAX_UPLOAD_SIZE_MB` to configure the upload limit in megabytes
- `MAX_CONTENT_LENGTH` is also supported as a lower-level byte-based override
- `SECRET_KEY` is required in production-like environments and must not use placeholder values such as `change-me`
- Development-only fallback secret key behavior is enabled only when `APP_ENV=development` or `FLASK_ENV=development`
- `OUTPUT_RETENTION_HOURS` controls how long generated output files are kept
- `CLEANUP_ON_STARTUP=true` removes expired generated files when the app starts
- `CLEANUP_AFTER_DOWNLOAD=true` deletes a result only after the response finishes sending
## Secure configuration example
### Production-like environment
```bash
python3 - <<'PY'
import secrets
print(secrets.token_urlsafe(48))
PY
```
Use the generated value as `SECRET_KEY`, for example:
```bash
docker run --rm -p 8000:8000 \
-e SECRET_KEY='replace-with-a-long-random-secret' \
-e MAX_UPLOAD_SIZE_MB=100 \
webfortilog
```

49
app/__init__.py Normal file
View File

@@ -0,0 +1,49 @@
from pathlib import Path
from flask import Flask, flash, render_template
from werkzeug.exceptions import RequestEntityTooLarge
from app.config import Config, validate_secret_key
from app.routes import main_blueprint
from app.services.storage import cleanup_expired_outputs
def _format_size_limit(size_limit_bytes: int) -> str:
"""Render the upload limit in a friendly unit for error messages."""
if size_limit_bytes >= 1024 * 1024:
return f"{size_limit_bytes / (1024 * 1024):.0f} MB"
if size_limit_bytes >= 1024:
return f"{size_limit_bytes / 1024:.0f} KB"
return f"{size_limit_bytes} bytes"
def create_app(config_class: type[Config] = Config) -> Flask:
"""Application factory used by Flask and Gunicorn."""
app = Flask(__name__, instance_relative_config=True)
app.config.from_object(config_class)
validate_secret_key(app.config["SECRET_KEY"])
output_dir = Path(app.config["OUTPUT_DIRECTORY"])
if not output_dir.is_absolute():
output_dir = Path(app.instance_path) / output_dir
app.config["OUTPUT_DIRECTORY"] = output_dir
output_dir.mkdir(parents=True, exist_ok=True)
if app.config.get("CLEANUP_ON_STARTUP", False):
cleanup_expired_outputs(
output_dir=output_dir,
retention_hours=app.config.get("OUTPUT_RETENTION_HOURS", 24),
)
app.register_blueprint(main_blueprint)
@app.errorhandler(RequestEntityTooLarge)
def handle_file_too_large(_error):
size_limit_bytes = int(app.config["MAX_CONTENT_LENGTH"])
flash(
f"The uploaded file is too large. Maximum allowed size is {_format_size_limit(size_limit_bytes)}.",
"danger",
)
return render_template("index.html"), 413
return app

83
app/config.py Normal file
View File

@@ -0,0 +1,83 @@
import os
from pathlib import Path
DEVELOPMENT_SECRET_KEY = "dev-secret-key-change-me"
UNSAFE_SECRET_KEYS = {
"",
"change-me",
"dev-secret-key-change-me",
"secret",
"default",
}
def _get_bool_setting(name: str, default: bool) -> bool:
"""Parse conventional boolean environment values."""
value = os.environ.get(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def _get_max_content_length() -> int:
"""Resolve the upload size limit from environment settings."""
upload_limit_mb = os.environ.get("MAX_UPLOAD_SIZE_MB")
if upload_limit_mb:
return int(upload_limit_mb) * 1024 * 1024
max_content_length = os.environ.get("MAX_CONTENT_LENGTH")
if max_content_length:
return int(max_content_length)
return 100 * 1024 * 1024
def _get_app_env() -> str:
"""Resolve the effective application environment."""
return (
os.environ.get("APP_ENV")
or os.environ.get("FLASK_ENV")
or "production"
).strip().lower()
def _is_development_env() -> bool:
"""Return whether the app is explicitly running in development mode."""
return _get_app_env() == "development"
def _get_secret_key() -> str:
"""Resolve the secret key with a development-only fallback."""
secret_key = os.environ.get("SECRET_KEY", "").strip()
if secret_key:
return secret_key
if _is_development_env():
return DEVELOPMENT_SECRET_KEY
return ""
def validate_secret_key(secret_key: str) -> None:
"""Fail fast when a production-like environment uses an unsafe secret key."""
normalized = secret_key.strip()
if _is_development_env():
return
if normalized.lower() in UNSAFE_SECRET_KEYS:
raise RuntimeError(
"SECRET_KEY is missing or unsafe for a production-like environment. "
"Set SECRET_KEY to a long random value, or use APP_ENV=development only for local development."
)
class Config:
"""Default configuration for local and container usage."""
SECRET_KEY = _get_secret_key()
# Default to 100 MiB so larger WAF exports can be processed without tuning.
MAX_CONTENT_LENGTH = _get_max_content_length()
PREVIEW_RECORD_LIMIT = int(os.environ.get("PREVIEW_RECORD_LIMIT", 5))
OUTPUT_DIRECTORY = Path(
os.environ.get("OUTPUT_DIRECTORY", Path("instance") / "outputs")
)
OUTPUT_RETENTION_HOURS = int(os.environ.get("OUTPUT_RETENTION_HOURS", 24))
CLEANUP_ON_STARTUP = _get_bool_setting("CLEANUP_ON_STARTUP", True)
CLEANUP_AFTER_DOWNLOAD = _get_bool_setting("CLEANUP_AFTER_DOWNLOAD", False)

35
app/constants.py Normal file
View File

@@ -0,0 +1,35 @@
VENDOR_FIELDS = [
"v015xxxxdate",
"time",
"policy",
"http_method",
"http_host",
"http_url",
"http_refer",
"service",
"backend_service",
"msg",
"signature_subclass",
"signature_id",
"owasp_top10",
"match_location",
"action",
"severity_level",
]
SEVERITY_RANKING = {
"critical": 5,
"high": 4,
"medium": 3,
"low": 2,
"info": 1,
"informational": 1,
"unknown": 0,
"none": 0,
"n/a": 0,
}
SORTABLE_FIELDS = {"datetime", "severity"}
SORT_ORDERS = {"asc", "desc"}
MODES = {"vendor", "full"}
OUTPUT_FORMATS = {"text", "csv"}

156
app/routes.py Normal file
View File

@@ -0,0 +1,156 @@
from dataclasses import dataclass
from pathlib import Path
from flask import (
Blueprint,
current_app,
flash,
redirect,
render_template,
request,
send_file,
url_for,
)
from werkzeug.datastructures import FileStorage
from werkzeug.wsgi import ClosingIterator
from app.constants import MODES, OUTPUT_FORMATS, SORTABLE_FIELDS, SORT_ORDERS
from app.services.conversion import convert_uploaded_log
from app.services.parser import LogParseError
from app.services.processing import ProcessingError, ProcessingOptions
from app.services.storage import delete_result_files, load_result_metadata
main_blueprint = Blueprint("main", __name__)
@dataclass(slots=True)
class FormData:
mode: str
output_format: str
sort_by: str
order: str
policy_cs: str
policy_ci: str
severity_cs: str
severity_ci: str
def _normalize_form() -> FormData:
return FormData(
mode=request.form.get("mode", "vendor").strip(),
output_format=request.form.get("output_format", "text").strip(),
sort_by=request.form.get("sort_by", "datetime").strip(),
order=request.form.get("order", "asc").strip(),
policy_cs=request.form.get("policy_cs", "").strip(),
policy_ci=request.form.get("policy_ci", "").strip(),
severity_cs=request.form.get("severity_cs", "").strip(),
severity_ci=request.form.get("severity_ci", "").strip(),
)
def _validate_form(file: FileStorage | None, form: FormData) -> list[str]:
errors: list[str] = []
if file is None or not file.filename:
errors.append("Please choose a log file to upload.")
if form.mode not in MODES:
errors.append("Invalid mode selection.")
if form.output_format not in OUTPUT_FORMATS:
errors.append("Invalid output format selection.")
if form.sort_by not in SORTABLE_FIELDS:
errors.append("Invalid sort field selection.")
if form.order not in SORT_ORDERS:
errors.append("Invalid sort order selection.")
if form.policy_cs and form.policy_ci:
errors.append(
"Policy filter must use either case-sensitive or case-insensitive match, not both."
)
if form.severity_cs and form.severity_ci:
errors.append(
"Severity filter must use either case-sensitive or case-insensitive match, not both."
)
return errors
@main_blueprint.get("/")
def index():
return render_template("index.html")
@main_blueprint.post("/convert")
def convert():
uploaded_file = request.files.get("log_file")
form = _normalize_form()
errors = _validate_form(uploaded_file, form)
if errors:
for error in errors:
flash(error, "danger")
return render_template("index.html", form=form), 400
assert uploaded_file is not None
try:
options = ProcessingOptions(
policy_cs=form.policy_cs,
policy_ci=form.policy_ci,
severity_cs=form.severity_cs,
severity_ci=form.severity_ci,
sort_by=form.sort_by,
order=form.order,
mode=form.mode,
)
conversion_result = convert_uploaded_log(
stream=uploaded_file.stream,
options=options,
output_dir=current_app.config["OUTPUT_DIRECTORY"],
output_format=form.output_format,
preview_record_limit=current_app.config["PREVIEW_RECORD_LIMIT"],
)
except (LogParseError, ProcessingError) as exc:
flash(str(exc), "danger")
return render_template("index.html", form=form), 400
except UnicodeDecodeError:
flash(
"The uploaded file could not be decoded. Supported encodings are UTF-8, UTF-8 with BOM, Windows-1252, and Latin-1.",
"danger",
)
return render_template("index.html", form=form), 400
return render_template(
"result.html",
result_id=conversion_result.metadata.result_id,
preview_text=conversion_result.export_result.preview(
current_app.config["PREVIEW_RECORD_LIMIT"]
),
output_format=form.output_format,
record_count=conversion_result.filtered_count,
parsed_count=conversion_result.parsed_count,
filtered_count=conversion_result.filtered_count,
mode=form.mode,
sort_by=form.sort_by,
order=form.order,
)
@main_blueprint.get("/download/<result_id>")
def download(result_id: str):
metadata = load_result_metadata(current_app.config["OUTPUT_DIRECTORY"], result_id)
if metadata is None:
flash("Requested output file could not be found.", "danger")
return redirect(url_for("main.index"))
response = send_file(
Path(metadata["file_path"]),
as_attachment=True,
download_name=metadata["download_name"],
mimetype=metadata["mimetype"],
max_age=0,
)
if current_app.config.get("CLEANUP_AFTER_DOWNLOAD", False):
output_dir = current_app.config["OUTPUT_DIRECTORY"]
response.response = ClosingIterator(
response.response,
[lambda: delete_result_files(output_dir=output_dir, result_id=result_id)],
)
return response

1
app/services/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Service layer for parsing, processing, exporting, and file storage."""

View File

@@ -0,0 +1,47 @@
from dataclasses import dataclass
from pathlib import Path
from app.services.exporter import ExportResult
from app.services.parser import create_parse_session
from app.services.processing import ProcessingOptions, filter_records, sort_records
from app.services.storage import ResultMetadata, persist_result
@dataclass(slots=True)
class ConversionResult:
metadata: ResultMetadata
export_result: ExportResult
parsed_count: int
filtered_count: int
def convert_uploaded_log(
stream,
options: ProcessingOptions,
output_dir: Path,
output_format: str,
preview_record_limit: int,
) -> ConversionResult:
"""Convert an uploaded log into a persisted export with a small in-memory preview.
Parsing, filtering, and export writing are streamed to keep memory usage low.
Sorting still materializes the filtered records because global ordering by datetime
or severity requires seeing the whole filtered result set first.
"""
parse_session = create_parse_session(stream)
sorted_records = sort_records(filter_records(parse_session.iter_records(), options), options)
metadata, export_result = persist_result(
output_dir=output_dir,
records=sorted_records,
union_keys=parse_session.union_keys(),
mode=options.mode,
output_format=output_format,
preview_record_limit=preview_record_limit,
)
return ConversionResult(
metadata=metadata,
export_result=export_result,
parsed_count=parse_session.parsed_count,
filtered_count=len(sorted_records),
)

107
app/services/exporter.py Normal file
View File

@@ -0,0 +1,107 @@
import csv
import io
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence, TextIO
from app.constants import VENDOR_FIELDS
@dataclass(slots=True)
class ExportResult:
columns: list[str]
output_format: str
preview_text: str
def preview(self, _record_limit: int) -> str:
"""Return the preview that was collected during export writing."""
return self.preview_text
def write_export(
file_path: Path,
records: Sequence[dict[str, str]],
union_keys: list[str],
mode: str,
output_format: str,
preview_record_limit: int,
) -> ExportResult:
"""Write the final export directly to disk and keep only a small preview in memory."""
columns = VENDOR_FIELDS if mode == "vendor" else union_keys
with file_path.open("w", encoding="utf-8", newline="") as export_file:
if output_format == "text":
preview_text = _write_text(
export_file=export_file,
records=records,
columns=columns,
preview_record_limit=preview_record_limit,
)
else:
preview_text = _write_csv(
export_file=export_file,
records=records,
columns=columns,
preview_record_limit=preview_record_limit,
)
return ExportResult(
columns=columns,
output_format=output_format,
preview_text=preview_text,
)
def _write_text(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
max_key_length = max((len(column) for column in columns), default=0)
preview_lines: list[str] = []
wrote_line = False
for index, record in enumerate(records, start=1):
header = f"--- record {index} ---"
wrote_line = _write_line(export_file, header, wrote_line)
if index <= preview_record_limit:
preview_lines.append(header)
for column in columns:
line = f" {column.ljust(max_key_length)} = {record.get(column, '')}"
wrote_line = _write_line(export_file, line, wrote_line)
if index <= preview_record_limit:
preview_lines.append(line)
return "\n".join(preview_lines)
def _write_csv(
export_file: TextIO,
records: Sequence[dict[str, str]],
columns: list[str],
preview_record_limit: int,
) -> str:
writer = csv.DictWriter(export_file, fieldnames=columns, extrasaction="ignore")
writer.writeheader()
preview_buffer = io.StringIO()
preview_writer = csv.DictWriter(preview_buffer, fieldnames=columns, extrasaction="ignore")
preview_writer.writeheader()
for index, record in enumerate(records, start=1):
row = {column: record.get(column, "") for column in columns}
writer.writerow(row)
if index <= preview_record_limit:
preview_writer.writerow(row)
return preview_buffer.getvalue().rstrip("\n")
def _write_line(export_file: TextIO, line: str, wrote_line: bool) -> bool:
"""Write lines without leaving a trailing newline at the end of the file."""
if wrote_line:
export_file.write("\n")
export_file.write(line)
return True

187
app/services/parser.py Normal file
View File

@@ -0,0 +1,187 @@
import codecs
from collections import OrderedDict
from dataclasses import dataclass, field
from io import BufferedIOBase, TextIOBase
import re
RECORD_PREFIX = "v015xxxxdate="
KEY_PATTERN = re.compile(r"(?:(?<=\s)|^)([A-Za-z_][A-Za-z0-9_]*)=")
class LogParseError(ValueError):
"""Raised when the uploaded log file cannot be parsed."""
@dataclass(slots=True)
class ParseSession:
"""Stateful streamed parser for uploaded log files."""
stream: BufferedIOBase | TextIOBase
encoding: str | None
_union_keys: OrderedDict[str, None] = field(default_factory=OrderedDict)
parsed_count: int = 0
_consumed: bool = False
def iter_records(self):
if self._consumed:
raise RuntimeError("ParseSession records can only be consumed once.")
self._consumed = True
for line_number, line in _iter_logical_records(_iter_physical_lines(self.stream, self.encoding)):
record = _parse_record(line, line_number)
for key in record:
self._union_keys.setdefault(key, None)
self.parsed_count += 1
yield record
def union_keys(self) -> list[str]:
return list(self._union_keys.keys())
def create_parse_session(stream: BufferedIOBase | TextIOBase) -> ParseSession:
"""Prepare a streamed parser session without materializing the full upload in memory."""
return ParseSession(stream=stream, encoding=_resolve_stream_encoding(stream))
def _normalize_value(value: str) -> str:
"""Remove balanced shell-style quotes while tolerating malformed values."""
value = value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
return value[1:-1]
if value[:1] in {'"', "'"}:
return value[1:]
return value
def _resolve_stream_encoding(stream: BufferedIOBase | TextIOBase) -> str | None:
"""Detect the most suitable stream encoding without reading the full file into memory."""
probe = stream.read(0)
if isinstance(probe, str):
return None
for encoding in ("utf-8-sig", "cp1252", "latin-1"):
try:
_validate_stream_encoding(stream, encoding)
return encoding
except UnicodeDecodeError:
continue
raise UnicodeDecodeError("unknown", b"", 0, 1, "Unsupported text encoding.")
def _validate_stream_encoding(stream: BufferedIOBase | TextIOBase, encoding: str) -> None:
"""Scan the stream to verify that the candidate encoding can decode it fully."""
_rewind_stream(stream)
decoder = codecs.getincrementaldecoder(encoding)()
for chunk in iter(lambda: stream.read(64 * 1024), b""):
decoder.decode(chunk, final=False)
decoder.decode(b"", final=True)
_rewind_stream(stream)
def _iter_physical_lines(
stream: BufferedIOBase | TextIOBase,
encoding: str | None,
):
"""Yield decoded physical lines from the uploaded stream without full-file buffering."""
_rewind_stream(stream)
if encoding is None:
for line_number, raw_line in enumerate(stream, start=1):
yield line_number, raw_line
return
line_number = 1
decoder = codecs.getincrementaldecoder(encoding)()
pending = ""
for chunk in iter(lambda: stream.read(64 * 1024), b""):
text = decoder.decode(chunk, final=False)
pending += text
while True:
newline_index = pending.find("\n")
if newline_index == -1:
break
line = pending[: newline_index + 1]
pending = pending[newline_index + 1 :]
yield line_number, line
line_number += 1
pending += decoder.decode(b"", final=True)
while True:
newline_index = pending.find("\n")
if newline_index == -1:
break
line = pending[: newline_index + 1]
pending = pending[newline_index + 1 :]
yield line_number, line
line_number += 1
if pending:
yield line_number, pending
def _rewind_stream(stream: BufferedIOBase | TextIOBase) -> None:
"""Move the uploaded stream back to the start."""
if not hasattr(stream, "seek"):
raise LogParseError("The uploaded file stream is not seekable.")
stream.seek(0)
def _parse_record(line: str, line_number: int) -> dict[str, str]:
"""Parse a logical record by locating `key=` boundaries instead of splitting on spaces."""
matches = list(KEY_PATTERN.finditer(line))
if not matches:
raise LogParseError(f"Line {line_number}: no key=value pairs were found.")
record: dict[str, str] = {}
for index, match in enumerate(matches):
key = match.group(1)
value_start = match.end()
value_end = matches[index + 1].start() if index + 1 < len(matches) else len(line)
raw_value = line[value_start:value_end].strip()
if raw_value and raw_value[:1] not in {'"', "'"} and any(
char.isspace() for char in raw_value
):
raise LogParseError(
f"Line {line_number}: invalid unquoted value for key '{key}'."
)
value = _normalize_value(raw_value)
record[key] = value
return record
def _iter_logical_records(physical_lines):
"""Rebuild logical records when embedded newlines split a single log entry."""
current_record: list[str] = []
current_start_line: int | None = None
for line_number, raw_line in physical_lines:
line = raw_line.strip()
if not line:
continue
if line.startswith(RECORD_PREFIX):
if current_record and current_start_line is not None:
yield current_start_line, "".join(current_record)
current_record = [line]
current_start_line = line_number
continue
if current_record:
current_record.append(line)
continue
raise LogParseError(
f"Line {line_number}: unexpected content before the first log record."
)
if current_record and current_start_line is not None:
yield current_start_line, "".join(current_record)
def parse_log_file(stream: BufferedIOBase | TextIOBase) -> tuple[list[dict[str, str]], list[str]]:
"""Compatibility helper that still materializes all parsed records when needed."""
session = create_parse_session(stream)
records = list(session.iter_records())
return records, session.union_keys()

View File

@@ -0,0 +1,92 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable
from app.constants import SEVERITY_RANKING
class ProcessingError(ValueError):
"""Raised when records cannot be processed according to the selected options."""
@dataclass(slots=True)
class ProcessingOptions:
policy_cs: str
policy_ci: str
severity_cs: str
severity_ci: str
sort_by: str
order: str
mode: str
def filter_records(
records: Iterable[dict[str, str]], options: ProcessingOptions
) -> Iterable[dict[str, str]]:
"""Apply user-selected filters lazily to parsed records."""
for record in records:
policy_value = record.get("policy", "")
severity_value = record.get("severity_level", "")
if options.policy_cs and options.policy_cs not in policy_value:
continue
if options.policy_ci and options.policy_ci.lower() not in policy_value.lower():
continue
if options.severity_cs and options.severity_cs not in severity_value:
continue
if options.severity_ci and options.severity_ci.lower() not in severity_value.lower():
continue
yield record
def sort_records(
records: Iterable[dict[str, str]], options: ProcessingOptions
) -> list[dict[str, str]]:
"""Sort records by datetime or severity using the requested order."""
reverse = options.order == "desc"
if options.sort_by == "datetime":
return _sort_records_by_datetime(records, reverse)
elif options.sort_by == "severity":
key_func = _severity_key
else:
raise ProcessingError("Unsupported sort field.")
return sorted(records, key=key_func, reverse=reverse)
def _sort_records_by_datetime(
records: Iterable[dict[str, str]], reverse: bool
) -> list[dict[str, str]]:
"""Sort valid datetimes normally and always place invalid/missing values last."""
valid_records: list[tuple[datetime, dict[str, str]]] = []
invalid_records: list[dict[str, str]] = []
for record in records:
parsed_datetime = _parse_datetime(record)
if parsed_datetime is None:
invalid_records.append(record)
continue
valid_records.append((parsed_datetime, record))
sorted_valid_records = sorted(valid_records, key=lambda item: item[0], reverse=reverse)
return [record for _parsed, record in sorted_valid_records] + invalid_records
def _parse_datetime(record: dict[str, str]) -> datetime | None:
date_value = record.get("v015xxxxdate", "").strip()
time_value = record.get("time", "").strip()
if not date_value or not time_value:
return None
try:
return datetime.strptime(f"{date_value} {time_value}", "%Y-%m-%d %H:%M:%S")
except ValueError:
return None
def _severity_key(record: dict[str, str]) -> tuple[int, str]:
raw_value = record.get("severity_level", "").strip().lower()
rank = SEVERITY_RANKING.get(raw_value, 0)
return (rank, raw_value)

109
app/services/storage.py Normal file
View File

@@ -0,0 +1,109 @@
import json
import uuid
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from app.services.exporter import ExportResult, write_export
@dataclass(slots=True)
class ResultMetadata:
result_id: str
file_path: str
download_name: str
mimetype: str
def _result_paths(output_dir: Path, result_id: str) -> tuple[Path, Path]:
"""Build the sidecar metadata and output file search pattern for a result id."""
metadata_path = output_dir / f"{result_id}.json"
return metadata_path, output_dir / f"{result_id}"
def persist_result(
output_dir: Path,
records: list[dict[str, str]],
union_keys: list[str],
mode: str,
output_format: str,
preview_record_limit: int,
) -> tuple[ResultMetadata, ExportResult]:
"""Persist generated output and sidecar metadata in a temporary directory."""
result_id = uuid.uuid4().hex
extension = "txt" if output_format == "text" else "csv"
mimetype = "text/plain; charset=utf-8" if extension == "txt" else "text/csv; charset=utf-8"
file_path = output_dir / f"{result_id}.{extension}"
metadata_path = output_dir / f"{result_id}.json"
export_result = write_export(
file_path=file_path,
records=records,
union_keys=union_keys,
mode=mode,
output_format=output_format,
preview_record_limit=preview_record_limit,
)
metadata = ResultMetadata(
result_id=result_id,
file_path=str(file_path),
download_name=f"waf-report.{extension}",
mimetype=mimetype,
)
metadata_path.write_text(json.dumps(asdict(metadata)), encoding="utf-8")
return metadata, export_result
def load_result_metadata(output_dir: Path, result_id: str) -> dict[str, str] | None:
"""Load sidecar metadata for a generated file."""
metadata_path, _base_path = _result_paths(output_dir, result_id)
if not metadata_path.exists():
return None
return json.loads(metadata_path.read_text(encoding="utf-8"))
def delete_result_files(output_dir: Path, result_id: str) -> None:
"""Delete a generated output file and its metadata sidecar if they still exist."""
metadata_path, base_path = _result_paths(output_dir, result_id)
for output_file in output_dir.glob(f"{base_path.name}.*"):
if output_file.name == metadata_path.name:
continue
output_file.unlink(missing_ok=True)
metadata_path.unlink(missing_ok=True)
def cleanup_expired_outputs(output_dir: Path, retention_hours: int) -> int:
"""Delete generated output sets older than the configured retention window."""
cutoff = datetime.now(timezone.utc) - timedelta(hours=retention_hours)
deleted_results = 0
for metadata_path in output_dir.glob("*.json"):
try:
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
payload = {}
result_id = payload.get("result_id") or metadata_path.stem
file_path = Path(payload["file_path"]) if "file_path" in payload else None
newest_mtime = _newest_mtime(metadata_path, file_path)
if newest_mtime is None or newest_mtime >= cutoff:
continue
delete_result_files(output_dir=output_dir, result_id=result_id)
deleted_results += 1
return deleted_results
def _newest_mtime(metadata_path: Path, file_path: Path | None) -> datetime | None:
"""Return the newest modification time across the metadata and output file."""
mtimes: list[datetime] = []
if metadata_path.exists():
mtimes.append(datetime.fromtimestamp(metadata_path.stat().st_mtime, tz=timezone.utc))
if file_path is not None and file_path.exists():
mtimes.append(datetime.fromtimestamp(file_path.stat().st_mtime, tz=timezone.utc))
if not mtimes:
return None
return max(mtimes)

38
app/templates/base.html Normal file
View File

@@ -0,0 +1,38 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>WAF Log Converter</title>
<link
href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css"
rel="stylesheet"
integrity="sha384-QWTKZyjpPEjISv5WaRU9OFeRpok6YctnYmDr5pNlyT2bRjXh0JMhjY6hW+ALEwIH"
crossorigin="anonymous"
>
</head>
<body class="bg-body-tertiary">
<main class="container py-5">
<div class="row justify-content-center">
<div class="col-lg-10">
<div class="mb-4">
<h1 class="display-6 fw-semibold">WAF Log Converter</h1>
<p class="text-secondary mb-0">
Upload a UTF-8 WAF log file and export a filtered report as readable text or CSV.
</p>
</div>
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }}" role="alert">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
</div>
</div>
</main>
</body>
</html>

100
app/templates/index.html Normal file
View File

@@ -0,0 +1,100 @@
{% extends "base.html" %}
{% set form = form or none %}
{% block content %}
<div class="card shadow-sm border-0">
<div class="card-body p-4">
<form method="post" action="{{ url_for('main.convert') }}" enctype="multipart/form-data" novalidate>
<div class="mb-4">
<label for="log_file" class="form-label fw-semibold">Log file</label>
<input class="form-control" id="log_file" name="log_file" type="file" required>
<div class="form-text">Each line must contain one record using shell-like key/value tokens.</div>
</div>
<div class="row g-3">
<div class="col-md-3">
<label for="mode" class="form-label">Mode</label>
<select class="form-select" id="mode" name="mode">
<option value="vendor" {% if form and form.mode == "vendor" %}selected{% endif %}>Vendor</option>
<option value="full" {% if form and form.mode == "full" %}selected{% endif %}>Full</option>
</select>
</div>
<div class="col-md-3">
<label for="output_format" class="form-label">Format</label>
<select class="form-select" id="output_format" name="output_format">
<option value="text" {% if form and form.output_format == "text" %}selected{% endif %}>Text</option>
<option value="csv" {% if form and form.output_format == "csv" %}selected{% endif %}>CSV</option>
</select>
</div>
<div class="col-md-3">
<label for="sort_by" class="form-label">Sort by</label>
<select class="form-select" id="sort_by" name="sort_by">
<option value="datetime" {% if not form or form.sort_by == "datetime" %}selected{% endif %}>Datetime</option>
<option value="severity" {% if form and form.sort_by == "severity" %}selected{% endif %}>Severity</option>
</select>
</div>
<div class="col-md-3">
<label for="order" class="form-label">Order</label>
<select class="form-select" id="order" name="order">
<option value="asc" {% if not form or form.order == "asc" %}selected{% endif %}>Ascending</option>
<option value="desc" {% if form and form.order == "desc" %}selected{% endif %}>Descending</option>
</select>
</div>
</div>
<hr class="my-4">
<div class="row g-3">
<div class="col-md-6">
<label for="policy_cs" class="form-label">Policy filter, case-sensitive</label>
<input
class="form-control"
id="policy_cs"
name="policy_cs"
type="text"
value="{{ form.policy_cs if form else '' }}"
>
</div>
<div class="col-md-6">
<label for="policy_ci" class="form-label">Policy filter, case-insensitive</label>
<input
class="form-control"
id="policy_ci"
name="policy_ci"
type="text"
value="{{ form.policy_ci if form else '' }}"
>
</div>
<div class="col-md-6">
<label for="severity_cs" class="form-label">Severity filter, case-sensitive</label>
<input
class="form-control"
id="severity_cs"
name="severity_cs"
type="text"
value="{{ form.severity_cs if form else '' }}"
>
</div>
<div class="col-md-6">
<label for="severity_ci" class="form-label">Severity filter, case-insensitive</label>
<input
class="form-control"
id="severity_ci"
name="severity_ci"
type="text"
value="{{ form.severity_ci if form else '' }}"
>
</div>
</div>
<div class="alert alert-light border mt-4 mb-0" role="note">
Use only one policy filter and one severity filter at a time. Matching happens as a partial substring.
</div>
<div class="mt-4 d-flex gap-2">
<button class="btn btn-primary" type="submit">Convert log</button>
<button class="btn btn-outline-secondary" type="reset">Reset</button>
</div>
</form>
</div>
</div>
{% endblock %}

45
app/templates/result.html Normal file
View File

@@ -0,0 +1,45 @@
{% extends "base.html" %}
{% block content %}
<div class="row g-4">
<div class="col-lg-4">
<div class="card shadow-sm border-0 h-100">
<div class="card-body">
<h2 class="h4">Result summary</h2>
<dl class="row mb-4">
<dt class="col-sm-5">Parsed records</dt>
<dd class="col-sm-7">{{ parsed_count }}</dd>
<dt class="col-sm-5">Output records</dt>
<dd class="col-sm-7">{{ filtered_count }}</dd>
<dt class="col-sm-5">Mode</dt>
<dd class="col-sm-7 text-capitalize">{{ mode }}</dd>
<dt class="col-sm-5">Format</dt>
<dd class="col-sm-7 text-uppercase">{{ output_format }}</dd>
<dt class="col-sm-5">Sort</dt>
<dd class="col-sm-7">{{ sort_by }} / {{ order }}</dd>
</dl>
<div class="d-grid gap-2">
<a class="btn btn-primary" href="{{ url_for('main.download', result_id=result_id) }}">
Download export
</a>
<a class="btn btn-outline-secondary" href="{{ url_for('main.index') }}">
Convert another file
</a>
</div>
</div>
</div>
</div>
<div class="col-lg-8">
<div class="card shadow-sm border-0">
<div class="card-body">
<div class="d-flex justify-content-between align-items-center mb-3">
<h2 class="h4 mb-0">Preview</h2>
<span class="badge text-bg-secondary">Showing up to {{ record_count if record_count < 5 else 5 }} records</span>
</div>
<pre class="bg-dark-subtle p-3 rounded small mb-0" style="white-space: pre-wrap;">{{ preview_text }}</pre>
</div>
</div>
</div>
</div>
{% endblock %}

16
compose.yaml Normal file
View File

@@ -0,0 +1,16 @@
services:
web:
build:
context: .
target: production
ports:
- "8000:8000"
env_file:
- env
test:
build:
context: .
target: test
env_file:
- env

6
env Normal file
View File

@@ -0,0 +1,6 @@
APP_ENV=development
MAX_UPLOAD_SIZE_MB=120
OUTPUT_DIRECTORY=/app/instance/outputs
OUTPUT_RETENTION_HOURS=24
CLEANUP_ON_STARTUP=true
CLEANUP_AFTER_DOWNLOAD=false

28
pyproject.toml Normal file
View File

@@ -0,0 +1,28 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "webfortilog"
version = "0.1.0"
description = "Flask application to convert WAF log files into text or CSV reports."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"Flask>=3.0,<4.0",
"gunicorn>=22.0,<24.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0,<9.0",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
filterwarnings = [
"error",
]
[tool.setuptools]
packages = ["app", "app.services"]

29
tests/conftest.py Normal file
View File

@@ -0,0 +1,29 @@
import shutil
from pathlib import Path
import pytest
from app import create_app
class TestConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = "test-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
@pytest.fixture()
def app():
flask_app = create_app(TestConfig)
yield flask_app
shutil.rmtree(Path(flask_app.instance_path) / "test-outputs", ignore_errors=True)
@pytest.fixture()
def client(app):
return app.test_client()

261
tests/test_app.py Normal file
View File

@@ -0,0 +1,261 @@
import io
import json
from pathlib import Path
from app import create_app
SAMPLE_LOG = (
'v015xxxxdate=2024-05-01 time=10:00:00 policy="Prod Policy" '
'http_method=GET http_host=example.com http_url="/login" '
'http_refer="https://ref.example" service=edge backend_service=api '
'msg="SQL injection blocked" signature_subclass=SQL signature_id=942100 '
'owasp_top10=A03 match_location=body action=blocked severity_level=high\n'
'v015xxxxdate=2024-05-02 time=11:00:00 policy="Prod Policy" '
'http_method=POST http_host=example.com http_url="/checkout" '
'http_refer="https://shop.example" service=edge backend_service=orders '
'msg="XSS blocked" signature_subclass=XSS signature_id=941100 '
'owasp_top10=A03 match_location=query action=monitored severity_level=medium\n'
)
def test_index_page_loads(client):
response = client.get("/")
assert response.status_code == 200
assert b"WAF Log Converter" in response.data
def test_convert_returns_text_preview_and_download_link(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "text",
"sort_by": "severity",
"order": "desc",
"policy_cs": "",
"policy_ci": "prod",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 200
assert b"Download export" in response.data
assert b"--- record 1 ---" in response.data
response.close()
def test_convert_full_mode_csv_preserves_union_order(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "full",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 200
assert b"TEXT" not in response.data
assert b"Download export" in response.data
response.close()
def test_convert_rejects_mutually_exclusive_filters(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "A",
"policy_ci": "a",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 400
assert b"Policy filter must use either case-sensitive or case-insensitive match" in response.data
response.close()
def test_download_route_returns_generated_file(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
convert_response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
html = convert_response.data.decode("utf-8")
marker = '/download/'
start = html.index(marker) + len(marker)
end = html.index('"', start)
result_id = html[start:end]
download_response = client.get(f"/download/{result_id}")
assert download_response.status_code == 200
assert download_response.headers["Content-Type"].startswith("text/csv")
assert b"v015xxxxdate,time,policy" in download_response.data
convert_response.close()
download_response.close()
def test_download_route_can_cleanup_files_after_download(tmp_path):
class CleanupAfterDownloadConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "download-cleanup-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = True
app = create_app(CleanupAfterDownloadConfig)
client = app.test_client()
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
convert_response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
html = convert_response.data.decode("utf-8")
marker = "/download/"
start = html.index(marker) + len(marker)
end = html.index('"', start)
result_id = html[start:end]
metadata_path = Path(app.config["OUTPUT_DIRECTORY"]) / f"{result_id}.json"
download_response = client.get(f"/download/{result_id}")
download_response.close()
convert_response.close()
assert not metadata_path.exists()
def test_cleanup_on_startup_removes_expired_outputs(tmp_path):
output_dir = tmp_path / "startup-cleanup-outputs"
output_dir.mkdir(parents=True)
result_id = "expired-result"
file_path = output_dir / f"{result_id}.csv"
metadata_path = output_dir / f"{result_id}.json"
file_path.write_text("header\nvalue\n", encoding="utf-8")
metadata_path.write_text(
json.dumps(
{
"result_id": result_id,
"file_path": str(file_path),
"download_name": "waf-report.csv",
"mimetype": "text/csv; charset=utf-8",
}
),
encoding="utf-8",
)
old_timestamp = 946684800
file_path.touch()
metadata_path.touch()
Path(file_path).touch()
import os
os.utime(file_path, (old_timestamp, old_timestamp))
os.utime(metadata_path, (old_timestamp, old_timestamp))
class StartupCleanupConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = output_dir
OUTPUT_RETENTION_HOURS = 1
CLEANUP_ON_STARTUP = True
CLEANUP_AFTER_DOWNLOAD = False
create_app(StartupCleanupConfig)
assert not file_path.exists()
assert not metadata_path.exists()
def test_default_upload_limit_is_100_mib(app):
assert app.config["MAX_CONTENT_LENGTH"] == 100 * 1024 * 1024
def test_too_large_upload_returns_friendly_message(tmp_path):
class SmallLimitConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 128
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "tiny-limit-outputs"
app = create_app(SmallLimitConfig)
client = app.test_client()
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "text",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 413
assert b"Maximum allowed size is 128 bytes." in response.data
response.close()

100
tests/test_config.py Normal file
View File

@@ -0,0 +1,100 @@
import pytest
from app import create_app
from app.config import (
DEVELOPMENT_SECRET_KEY,
_get_max_content_length,
_get_secret_key,
validate_secret_key,
)
def test_max_upload_size_mb_environment_variable(monkeypatch):
monkeypatch.setenv("MAX_UPLOAD_SIZE_MB", "42")
monkeypatch.delenv("MAX_CONTENT_LENGTH", raising=False)
assert _get_max_content_length() == 42 * 1024 * 1024
def test_max_content_length_environment_variable_is_supported(monkeypatch):
monkeypatch.delenv("MAX_UPLOAD_SIZE_MB", raising=False)
monkeypatch.setenv("MAX_CONTENT_LENGTH", "2048")
assert _get_max_content_length() == 2048
def test_secret_key_uses_development_fallback(monkeypatch):
monkeypatch.setenv("APP_ENV", "development")
monkeypatch.delenv("FLASK_ENV", raising=False)
monkeypatch.delenv("SECRET_KEY", raising=False)
assert _get_secret_key() == DEVELOPMENT_SECRET_KEY
def test_secret_key_is_required_outside_development(monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
monkeypatch.delenv("SECRET_KEY", raising=False)
assert _get_secret_key() == ""
def test_validate_secret_key_rejects_unsafe_value_outside_development(monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
with pytest.raises(RuntimeError, match="SECRET_KEY is missing or unsafe"):
validate_secret_key("change-me")
def test_create_app_allows_development_without_explicit_secret_key(tmp_path, monkeypatch):
monkeypatch.setenv("APP_ENV", "development")
monkeypatch.delenv("FLASK_ENV", raising=False)
monkeypatch.delenv("SECRET_KEY", raising=False)
class DevelopmentConfig:
SECRET_KEY = DEVELOPMENT_SECRET_KEY
MAX_CONTENT_LENGTH = 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "dev-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
app = create_app(DevelopmentConfig)
assert app.config["SECRET_KEY"] == DEVELOPMENT_SECRET_KEY
def test_create_app_rejects_unsafe_secret_key_outside_development(tmp_path, monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
class ProductionConfig:
SECRET_KEY = "change-me"
MAX_CONTENT_LENGTH = 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "prod-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
with pytest.raises(RuntimeError, match="SECRET_KEY is missing or unsafe"):
create_app(ProductionConfig)
def test_create_app_rejects_missing_secret_key_outside_development(tmp_path, monkeypatch):
monkeypatch.setenv("APP_ENV", "production")
monkeypatch.delenv("FLASK_ENV", raising=False)
class ProductionConfig:
SECRET_KEY = ""
MAX_CONTENT_LENGTH = 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "prod-outputs-missing-key"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = False
with pytest.raises(RuntimeError, match="SECRET_KEY is missing or unsafe"):
create_app(ProductionConfig)

92
tests/test_parser.py Normal file
View File

@@ -0,0 +1,92 @@
import io
import pytest
from app.services.parser import LogParseError, parse_log_file
def test_parse_log_file_supports_shell_style_quotes():
stream = io.BytesIO(
b'v015xxxxdate=2024-02-15 time=09:10:11 policy="Strict Policy" msg="blocked request"\n'
)
records, union_keys = parse_log_file(stream)
assert records == [
{
"v015xxxxdate": "2024-02-15",
"time": "09:10:11",
"policy": "Strict Policy",
"msg": "blocked request",
}
]
assert union_keys == ["v015xxxxdate", "time", "policy", "msg"]
def test_parse_log_file_rejects_tokens_without_equals():
stream = io.BytesIO(b"v015xxxxdate=2024-02-15 broken-token\n")
with pytest.raises(LogParseError):
parse_log_file(stream)
def test_parse_log_file_supports_utf8_bom():
stream = io.BytesIO(
b'\xef\xbb\xbfv015xxxxdate=2024-02-15 time=09:10:11 msg="blocked request"\n'
)
records, _union_keys = parse_log_file(stream)
assert records[0]["v015xxxxdate"] == "2024-02-15"
def test_parse_log_file_supports_cp1252_text():
stream = io.BytesIO(
'v015xxxxdate=2024-02-15 time=09:10:11 msg="caf\xe9 request"\n'.encode("cp1252")
)
records, _union_keys = parse_log_file(stream)
assert records[0]["msg"] == "cafe request".replace("e", "é", 1)
def test_parse_log_file_tolerates_unterminated_quotes():
stream = io.BytesIO(
b'v015xxxxdate=2024-02-15 time=09:10:11 msg="broken quoted value\n'
)
records, _union_keys = parse_log_file(stream)
assert records[0]["msg"] == "broken quoted value"
def test_parse_log_file_rebuilds_record_after_embedded_newlines():
stream = io.BytesIO(
b'v015xxxxdate=2024-02-15 time=09:10:11 msg="hello\n'
b'broken-fragment\n'
b'world" action=Alert\n'
b'v015xxxxdate=2024-02-15 time=09:10:12 msg="next" action=Monitor\n'
)
records, _union_keys = parse_log_file(stream)
assert len(records) == 2
assert records[0]["msg"] == "hellobroken-fragmentworld"
assert records[0]["action"] == "Alert"
assert records[1]["msg"] == "next"
def test_parse_log_file_does_not_require_full_stream_read():
class NoFullReadBytesIO(io.BytesIO):
def read(self, size=-1):
if size == -1:
raise AssertionError("full stream read should not be used")
return super().read(size)
stream = NoFullReadBytesIO(
b'v015xxxxdate=2024-02-15 time=09:10:11 policy="Strict Policy" msg="blocked request"\n'
)
records, _union_keys = parse_log_file(stream)
assert records[0]["policy"] == "Strict Policy"

108
tests/test_processing.py Normal file
View File

@@ -0,0 +1,108 @@
from app.services.processing import ProcessingOptions, filter_records, sort_records
def test_filter_records_supports_case_insensitive_filters():
records = [
{"policy": "ProdPolicy", "severity_level": "HIGH"},
{"policy": "OtherPolicy", "severity_level": "low"},
]
options = ProcessingOptions(
policy_cs="",
policy_ci="prod",
severity_cs="",
severity_ci="high",
sort_by="datetime",
order="asc",
mode="vendor",
)
filtered = list(filter_records(records, options))
assert filtered == [{"policy": "ProdPolicy", "severity_level": "HIGH"}]
def test_sort_records_by_severity_desc_uses_defined_ranking():
records = [
{"severity_level": "medium"},
{"severity_level": "critical"},
{"severity_level": "info"},
]
options = ProcessingOptions(
policy_cs="",
policy_ci="",
severity_cs="",
severity_ci="",
sort_by="severity",
order="desc",
mode="vendor",
)
sorted_records = sort_records(records, options)
assert [record["severity_level"] for record in sorted_records] == [
"critical",
"medium",
"info",
]
def test_sort_records_by_datetime_asc_places_invalid_records_last():
records = [
{"v015xxxxdate": "2024-05-03", "time": "08:00:00", "msg": "latest-valid"},
{"v015xxxxdate": "", "time": "09:00:00", "msg": "missing-date"},
{"v015xxxxdate": "2024-05-01", "time": "10:00:00", "msg": "earliest-valid"},
{"v015xxxxdate": "2024-05-02", "time": "", "msg": "missing-time"},
{"v015xxxxdate": "bad-date", "time": "99:99:99", "msg": "invalid-datetime"},
{"v015xxxxdate": "2024-05-02", "time": "09:30:00", "msg": "middle-valid"},
]
options = ProcessingOptions(
policy_cs="",
policy_ci="",
severity_cs="",
severity_ci="",
sort_by="datetime",
order="asc",
mode="vendor",
)
sorted_records = sort_records(records, options)
assert [record["msg"] for record in sorted_records] == [
"earliest-valid",
"middle-valid",
"latest-valid",
"missing-date",
"missing-time",
"invalid-datetime",
]
def test_sort_records_by_datetime_desc_places_invalid_records_last():
records = [
{"v015xxxxdate": "2024-05-03", "time": "08:00:00", "msg": "latest-valid"},
{"v015xxxxdate": "", "time": "09:00:00", "msg": "missing-date"},
{"v015xxxxdate": "2024-05-01", "time": "10:00:00", "msg": "earliest-valid"},
{"v015xxxxdate": "2024-05-02", "time": "", "msg": "missing-time"},
{"v015xxxxdate": "bad-date", "time": "99:99:99", "msg": "invalid-datetime"},
{"v015xxxxdate": "2024-05-02", "time": "09:30:00", "msg": "middle-valid"},
]
options = ProcessingOptions(
policy_cs="",
policy_ci="",
severity_cs="",
severity_ci="",
sort_by="datetime",
order="desc",
mode="vendor",
)
sorted_records = sort_records(records, options)
assert [record["msg"] for record in sorted_records] == [
"latest-valid",
"middle-valid",
"earliest-valid",
"missing-date",
"missing-time",
"invalid-datetime",
]

75
tests/test_storage.py Normal file
View File

@@ -0,0 +1,75 @@
import json
import os
from pathlib import Path
from app.services.storage import cleanup_expired_outputs, delete_result_files, persist_result
def test_persist_result_writes_csv_and_collects_preview(tmp_path: Path):
metadata, export_result = persist_result(
output_dir=tmp_path,
records=[
{
"v015xxxxdate": "2024-05-01",
"time": "10:00:00",
"policy": "Prod Policy",
"severity_level": "high",
},
{
"v015xxxxdate": "2024-05-02",
"time": "11:00:00",
"policy": "Other Policy",
"severity_level": "low",
},
],
union_keys=["v015xxxxdate", "time", "policy", "severity_level"],
mode="full",
output_format="csv",
preview_record_limit=1,
)
written = Path(metadata.file_path).read_text(encoding="utf-8")
assert metadata.download_name == "waf-report.csv"
assert "v015xxxxdate,time,policy,severity_level" in written
assert "2024-05-01,10:00:00,Prod Policy,high" in written
assert export_result.preview(1).count("\n") == 1
def test_delete_result_files_removes_output_and_metadata(tmp_path: Path):
result_id = "delete-me"
output_file = tmp_path / f"{result_id}.txt"
metadata_file = tmp_path / f"{result_id}.json"
output_file.write_text("content", encoding="utf-8")
metadata_file.write_text("{}", encoding="utf-8")
delete_result_files(output_dir=tmp_path, result_id=result_id)
assert not output_file.exists()
assert not metadata_file.exists()
def test_cleanup_expired_outputs_removes_only_old_results(tmp_path: Path):
old_result_id = "old-result"
new_result_id = "new-result"
old_output = tmp_path / f"{old_result_id}.csv"
old_metadata = tmp_path / f"{old_result_id}.json"
new_output = tmp_path / f"{new_result_id}.csv"
new_metadata = tmp_path / f"{new_result_id}.json"
old_output.write_text("old", encoding="utf-8")
new_output.write_text("new", encoding="utf-8")
old_metadata.write_text(json.dumps({"result_id": old_result_id, "file_path": str(old_output)}), encoding="utf-8")
new_metadata.write_text(json.dumps({"result_id": new_result_id, "file_path": str(new_output)}), encoding="utf-8")
old_timestamp = 946684800
os.utime(old_output, (old_timestamp, old_timestamp))
os.utime(old_metadata, (old_timestamp, old_timestamp))
deleted_results = cleanup_expired_outputs(output_dir=tmp_path, retention_hours=1)
assert deleted_results == 1
assert not old_output.exists()
assert not old_metadata.exists()
assert new_output.exists()
assert new_metadata.exists()

3
wsgi.py Normal file
View File

@@ -0,0 +1,3 @@
from app import create_app
app = create_app()