import io import json from pathlib import Path from app import create_app SAMPLE_LOG = ( 'v015xxxxdate=2024-05-01 time=10:00:00 policy="Prod Policy" ' 'http_method=GET http_host=example.com http_url="/login" ' 'http_refer="https://ref.example" service=edge backend_service=api ' 'msg="SQL injection blocked" signature_subclass=SQL signature_id=942100 ' 'owasp_top10=A03 match_location=body action=blocked severity_level=high\n' 'v015xxxxdate=2024-05-02 time=11:00:00 policy="Prod Policy" ' 'http_method=POST http_host=example.com http_url="/checkout" ' 'http_refer="https://shop.example" service=edge backend_service=orders ' 'msg="XSS blocked" signature_subclass=XSS signature_id=941100 ' 'owasp_top10=A03 match_location=query action=monitored severity_level=medium\n' ) def test_index_page_loads(client): response = client.get("/") assert response.status_code == 200 assert b"WAF Log Converter" in response.data def test_convert_returns_text_preview_and_download_link(client): log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8")) response = client.post( "/convert", data={ "mode": "vendor", "output_format": "text", "sort_by": "severity", "order": "desc", "policy_cs": "", "policy_ci": "prod", "severity_cs": "", "severity_ci": "", "log_file": (log_file, "sample.log"), }, content_type="multipart/form-data", ) log_file.close() assert response.status_code == 200 assert b"Download export" in response.data assert b"--- record 1 ---" in response.data response.close() def test_convert_full_mode_csv_preserves_union_order(client): log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8")) response = client.post( "/convert", data={ "mode": "full", "output_format": "csv", "sort_by": "datetime", "order": "asc", "policy_cs": "", "policy_ci": "", "severity_cs": "", "severity_ci": "", "log_file": (log_file, "sample.log"), }, content_type="multipart/form-data", ) log_file.close() assert response.status_code == 200 assert b"TEXT" not in response.data assert b"Download export" in response.data response.close() def test_convert_rejects_mutually_exclusive_filters(client): log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8")) response = client.post( "/convert", data={ "mode": "vendor", "output_format": "csv", "sort_by": "datetime", "order": "asc", "policy_cs": "A", "policy_ci": "a", "severity_cs": "", "severity_ci": "", "log_file": (log_file, "sample.log"), }, content_type="multipart/form-data", ) log_file.close() assert response.status_code == 400 assert b"Policy filter must use either case-sensitive or case-insensitive match" in response.data response.close() def test_download_route_returns_generated_file(client): log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8")) convert_response = client.post( "/convert", data={ "mode": "vendor", "output_format": "csv", "sort_by": "datetime", "order": "asc", "policy_cs": "", "policy_ci": "", "severity_cs": "", "severity_ci": "", "log_file": (log_file, "sample.log"), }, content_type="multipart/form-data", ) log_file.close() html = convert_response.data.decode("utf-8") marker = '/download/' start = html.index(marker) + len(marker) end = html.index('"', start) result_id = html[start:end] download_response = client.get(f"/download/{result_id}") assert download_response.status_code == 200 assert download_response.headers["Content-Type"].startswith("text/csv") assert b"v015xxxxdate,time,policy" in download_response.data convert_response.close() download_response.close() def test_download_route_can_cleanup_files_after_download(tmp_path): class CleanupAfterDownloadConfig: TESTING = True SECRET_KEY = "test-secret" MAX_CONTENT_LENGTH = 100 * 1024 * 1024 PREVIEW_RECORD_LIMIT = 5 OUTPUT_DIRECTORY = tmp_path / "download-cleanup-outputs" OUTPUT_RETENTION_HOURS = 24 CLEANUP_ON_STARTUP = False CLEANUP_AFTER_DOWNLOAD = True app = create_app(CleanupAfterDownloadConfig) client = app.test_client() log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8")) convert_response = client.post( "/convert", data={ "mode": "vendor", "output_format": "csv", "sort_by": "datetime", "order": "asc", "policy_cs": "", "policy_ci": "", "severity_cs": "", "severity_ci": "", "log_file": (log_file, "sample.log"), }, content_type="multipart/form-data", ) log_file.close() html = convert_response.data.decode("utf-8") marker = "/download/" start = html.index(marker) + len(marker) end = html.index('"', start) result_id = html[start:end] metadata_path = Path(app.config["OUTPUT_DIRECTORY"]) / f"{result_id}.json" download_response = client.get(f"/download/{result_id}") download_response.close() convert_response.close() assert not metadata_path.exists() def test_cleanup_on_startup_removes_expired_outputs(tmp_path): output_dir = tmp_path / "startup-cleanup-outputs" output_dir.mkdir(parents=True) result_id = "expired-result" file_path = output_dir / f"{result_id}.csv" metadata_path = output_dir / f"{result_id}.json" file_path.write_text("header\nvalue\n", encoding="utf-8") metadata_path.write_text( json.dumps( { "result_id": result_id, "file_path": str(file_path), "download_name": "waf-report.csv", "mimetype": "text/csv; charset=utf-8", } ), encoding="utf-8", ) old_timestamp = 946684800 file_path.touch() metadata_path.touch() Path(file_path).touch() import os os.utime(file_path, (old_timestamp, old_timestamp)) os.utime(metadata_path, (old_timestamp, old_timestamp)) class StartupCleanupConfig: TESTING = True SECRET_KEY = "test-secret" MAX_CONTENT_LENGTH = 100 * 1024 * 1024 PREVIEW_RECORD_LIMIT = 5 OUTPUT_DIRECTORY = output_dir OUTPUT_RETENTION_HOURS = 1 CLEANUP_ON_STARTUP = True CLEANUP_AFTER_DOWNLOAD = False create_app(StartupCleanupConfig) assert not file_path.exists() assert not metadata_path.exists() def test_default_upload_limit_is_100_mib(app): assert app.config["MAX_CONTENT_LENGTH"] == 100 * 1024 * 1024 def test_too_large_upload_returns_friendly_message(tmp_path): class SmallLimitConfig: TESTING = True SECRET_KEY = "test-secret" MAX_CONTENT_LENGTH = 128 PREVIEW_RECORD_LIMIT = 5 OUTPUT_DIRECTORY = tmp_path / "tiny-limit-outputs" app = create_app(SmallLimitConfig) client = app.test_client() log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8")) response = client.post( "/convert", data={ "mode": "vendor", "output_format": "text", "sort_by": "datetime", "order": "asc", "policy_cs": "", "policy_ci": "", "severity_cs": "", "severity_ci": "", "log_file": (log_file, "sample.log"), }, content_type="multipart/form-data", ) log_file.close() assert response.status_code == 413 assert b"Maximum allowed size is 128 bytes." in response.data response.close()