Files
webfortilog/app/routes.py
2026-04-24 14:40:32 +02:00

151 lines
4.7 KiB
Python

from dataclasses import dataclass
from pathlib import Path
from flask import (
Blueprint,
current_app,
flash,
redirect,
render_template,
request,
send_file,
url_for,
)
from werkzeug.datastructures import FileStorage
from app.constants import MODES, OUTPUT_FORMATS, SORTABLE_FIELDS, SORT_ORDERS
from app.services.exporter import build_export
from app.services.parser import LogParseError, parse_log_file
from app.services.processing import (
ProcessingError,
ProcessingOptions,
filter_records,
sort_records,
)
from app.services.storage import load_result_metadata, persist_result
main_blueprint = Blueprint("main", __name__)
@dataclass(slots=True)
class FormData:
mode: str
output_format: str
sort_by: str
order: str
policy_cs: str
policy_ci: str
severity_cs: str
severity_ci: str
def _normalize_form() -> FormData:
return FormData(
mode=request.form.get("mode", "vendor").strip(),
output_format=request.form.get("output_format", "text").strip(),
sort_by=request.form.get("sort_by", "datetime").strip(),
order=request.form.get("order", "asc").strip(),
policy_cs=request.form.get("policy_cs", "").strip(),
policy_ci=request.form.get("policy_ci", "").strip(),
severity_cs=request.form.get("severity_cs", "").strip(),
severity_ci=request.form.get("severity_ci", "").strip(),
)
def _validate_form(file: FileStorage | None, form: FormData) -> list[str]:
errors: list[str] = []
if file is None or not file.filename:
errors.append("Please choose a log file to upload.")
if form.mode not in MODES:
errors.append("Invalid mode selection.")
if form.output_format not in OUTPUT_FORMATS:
errors.append("Invalid output format selection.")
if form.sort_by not in SORTABLE_FIELDS:
errors.append("Invalid sort field selection.")
if form.order not in SORT_ORDERS:
errors.append("Invalid sort order selection.")
if form.policy_cs and form.policy_ci:
errors.append(
"Policy filter must use either case-sensitive or case-insensitive match, not both."
)
if form.severity_cs and form.severity_ci:
errors.append(
"Severity filter must use either case-sensitive or case-insensitive match, not both."
)
return errors
@main_blueprint.get("/")
def index():
return render_template("index.html")
@main_blueprint.post("/convert")
def convert():
uploaded_file = request.files.get("log_file")
form = _normalize_form()
errors = _validate_form(uploaded_file, form)
if errors:
for error in errors:
flash(error, "danger")
return render_template("index.html", form=form), 400
assert uploaded_file is not None
try:
records, union_keys = parse_log_file(uploaded_file.stream)
options = ProcessingOptions(
policy_cs=form.policy_cs,
policy_ci=form.policy_ci,
severity_cs=form.severity_cs,
severity_ci=form.severity_ci,
sort_by=form.sort_by,
order=form.order,
mode=form.mode,
)
filtered_records = filter_records(records, options)
sorted_records = sort_records(filtered_records, options)
export_result = build_export(sorted_records, union_keys, form.mode, form.output_format)
metadata = persist_result(
output_dir=current_app.config["OUTPUT_DIRECTORY"],
export_result=export_result,
)
except (LogParseError, ProcessingError) as exc:
flash(str(exc), "danger")
return render_template("index.html", form=form), 400
except UnicodeDecodeError:
flash("The uploaded file is not valid UTF-8 text.", "danger")
return render_template("index.html", form=form), 400
preview_limit = current_app.config["PREVIEW_RECORD_LIMIT"]
return render_template(
"result.html",
result_id=metadata.result_id,
preview_text=export_result.preview(preview_limit),
output_format=form.output_format,
record_count=len(sorted_records),
parsed_count=len(records),
filtered_count=len(sorted_records),
mode=form.mode,
sort_by=form.sort_by,
order=form.order,
)
@main_blueprint.get("/download/<result_id>")
def download(result_id: str):
metadata = load_result_metadata(current_app.config["OUTPUT_DIRECTORY"], result_id)
if metadata is None:
flash("Requested output file could not be found.", "danger")
return redirect(url_for("main.index"))
return send_file(
Path(metadata["file_path"]),
as_attachment=True,
download_name=metadata["download_name"],
mimetype=metadata["mimetype"],
max_age=0,
)