from dataclasses import dataclass from pathlib import Path from flask import ( Blueprint, current_app, flash, redirect, render_template, request, send_file, url_for, ) from werkzeug.datastructures import FileStorage from werkzeug.wsgi import ClosingIterator from app.constants import MODES, OUTPUT_FORMATS, SORTABLE_FIELDS, SORT_ORDERS from app.services.conversion import convert_uploaded_log from app.services.parser import LogParseError from app.services.processing import ProcessingError, ProcessingOptions from app.services.storage import delete_result_files, load_result_metadata main_blueprint = Blueprint("main", __name__) @dataclass(slots=True) class FormData: mode: str output_format: str sort_by: str order: str policy_cs: str policy_ci: str severity_cs: str severity_ci: str def _normalize_form() -> FormData: return FormData( mode=request.form.get("mode", "vendor").strip(), output_format=request.form.get("output_format", "text").strip(), sort_by=request.form.get("sort_by", "datetime").strip(), order=request.form.get("order", "asc").strip(), policy_cs=request.form.get("policy_cs", "").strip(), policy_ci=request.form.get("policy_ci", "").strip(), severity_cs=request.form.get("severity_cs", "").strip(), severity_ci=request.form.get("severity_ci", "").strip(), ) def _validate_form(file: FileStorage | None, form: FormData) -> list[str]: errors: list[str] = [] if file is None or not file.filename: errors.append("Please choose a log file to upload.") if form.mode not in MODES: errors.append("Invalid mode selection.") if form.output_format not in OUTPUT_FORMATS: errors.append("Invalid output format selection.") if form.sort_by not in SORTABLE_FIELDS: errors.append("Invalid sort field selection.") if form.order not in SORT_ORDERS: errors.append("Invalid sort order selection.") if form.policy_cs and form.policy_ci: errors.append( "Policy filter must use either case-sensitive or case-insensitive match, not both." ) if form.severity_cs and form.severity_ci: errors.append( "Severity filter must use either case-sensitive or case-insensitive match, not both." ) return errors @main_blueprint.get("/") def index(): return render_template("index.html") @main_blueprint.post("/convert") def convert(): uploaded_file = request.files.get("log_file") form = _normalize_form() errors = _validate_form(uploaded_file, form) if errors: for error in errors: flash(error, "danger") return render_template("index.html", form=form), 400 assert uploaded_file is not None try: options = ProcessingOptions( policy_cs=form.policy_cs, policy_ci=form.policy_ci, severity_cs=form.severity_cs, severity_ci=form.severity_ci, sort_by=form.sort_by, order=form.order, mode=form.mode, ) conversion_result = convert_uploaded_log( stream=uploaded_file.stream, options=options, output_dir=current_app.config["OUTPUT_DIRECTORY"], output_format=form.output_format, preview_record_limit=current_app.config["PREVIEW_RECORD_LIMIT"], ) except (LogParseError, ProcessingError) as exc: flash(str(exc), "danger") return render_template("index.html", form=form), 400 except UnicodeDecodeError: flash( "The uploaded file could not be decoded. Supported encodings are UTF-8, UTF-8 with BOM, Windows-1252, and Latin-1.", "danger", ) return render_template("index.html", form=form), 400 return render_template( "result.html", result_id=conversion_result.metadata.result_id, preview_text=conversion_result.export_result.preview( current_app.config["PREVIEW_RECORD_LIMIT"] ), output_format=form.output_format, record_count=conversion_result.filtered_count, parsed_count=conversion_result.parsed_count, filtered_count=conversion_result.filtered_count, mode=form.mode, sort_by=form.sort_by, order=form.order, ) @main_blueprint.get("/download/") def download(result_id: str): metadata = load_result_metadata(current_app.config["OUTPUT_DIRECTORY"], result_id) if metadata is None: flash("Requested output file could not be found.", "danger") return redirect(url_for("main.index")) response = send_file( Path(metadata["file_path"]), as_attachment=True, download_name=metadata["download_name"], mimetype=metadata["mimetype"], max_age=0, ) if current_app.config.get("CLEANUP_AFTER_DOWNLOAD", False): output_dir = current_app.config["OUTPUT_DIRECTORY"] response.response = ClosingIterator( response.response, [lambda: delete_result_files(output_dir=output_dir, result_id=result_id)], ) return response