Files
webfortilog/tests/test_app.py
2026-04-27 14:17:44 +02:00

262 lines
7.9 KiB
Python

import io
import json
from pathlib import Path
from app import create_app
SAMPLE_LOG = (
'v015xxxxdate=2024-05-01 time=10:00:00 policy="Prod Policy" '
'http_method=GET http_host=example.com http_url="/login" '
'http_refer="https://ref.example" service=edge backend_service=api '
'msg="SQL injection blocked" signature_subclass=SQL signature_id=942100 '
'owasp_top10=A03 match_location=body action=blocked severity_level=high\n'
'v015xxxxdate=2024-05-02 time=11:00:00 policy="Prod Policy" '
'http_method=POST http_host=example.com http_url="/checkout" '
'http_refer="https://shop.example" service=edge backend_service=orders '
'msg="XSS blocked" signature_subclass=XSS signature_id=941100 '
'owasp_top10=A03 match_location=query action=monitored severity_level=medium\n'
)
def test_index_page_loads(client):
response = client.get("/")
assert response.status_code == 200
assert b"WAF Log Converter" in response.data
def test_convert_returns_text_preview_and_download_link(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "text",
"sort_by": "severity",
"order": "desc",
"policy_cs": "",
"policy_ci": "prod",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 200
assert b"Download export" in response.data
assert b"--- record 1 ---" in response.data
response.close()
def test_convert_full_mode_csv_preserves_union_order(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "full",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 200
assert b"TEXT" not in response.data
assert b"Download export" in response.data
response.close()
def test_convert_rejects_mutually_exclusive_filters(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "A",
"policy_ci": "a",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 400
assert b"Policy filter must use either case-sensitive or case-insensitive match" in response.data
response.close()
def test_download_route_returns_generated_file(client):
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
convert_response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
html = convert_response.data.decode("utf-8")
marker = '/download/'
start = html.index(marker) + len(marker)
end = html.index('"', start)
result_id = html[start:end]
download_response = client.get(f"/download/{result_id}")
assert download_response.status_code == 200
assert download_response.headers["Content-Type"].startswith("text/csv")
assert b"v015xxxxdate,time,policy" in download_response.data
convert_response.close()
download_response.close()
def test_download_route_can_cleanup_files_after_download(tmp_path):
class CleanupAfterDownloadConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "download-cleanup-outputs"
OUTPUT_RETENTION_HOURS = 24
CLEANUP_ON_STARTUP = False
CLEANUP_AFTER_DOWNLOAD = True
app = create_app(CleanupAfterDownloadConfig)
client = app.test_client()
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
convert_response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "csv",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
html = convert_response.data.decode("utf-8")
marker = "/download/"
start = html.index(marker) + len(marker)
end = html.index('"', start)
result_id = html[start:end]
metadata_path = Path(app.config["OUTPUT_DIRECTORY"]) / f"{result_id}.json"
download_response = client.get(f"/download/{result_id}")
download_response.close()
convert_response.close()
assert not metadata_path.exists()
def test_cleanup_on_startup_removes_expired_outputs(tmp_path):
output_dir = tmp_path / "startup-cleanup-outputs"
output_dir.mkdir(parents=True)
result_id = "expired-result"
file_path = output_dir / f"{result_id}.csv"
metadata_path = output_dir / f"{result_id}.json"
file_path.write_text("header\nvalue\n", encoding="utf-8")
metadata_path.write_text(
json.dumps(
{
"result_id": result_id,
"file_path": str(file_path),
"download_name": "waf-report.csv",
"mimetype": "text/csv; charset=utf-8",
}
),
encoding="utf-8",
)
old_timestamp = 946684800
file_path.touch()
metadata_path.touch()
Path(file_path).touch()
import os
os.utime(file_path, (old_timestamp, old_timestamp))
os.utime(metadata_path, (old_timestamp, old_timestamp))
class StartupCleanupConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 100 * 1024 * 1024
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = output_dir
OUTPUT_RETENTION_HOURS = 1
CLEANUP_ON_STARTUP = True
CLEANUP_AFTER_DOWNLOAD = False
create_app(StartupCleanupConfig)
assert not file_path.exists()
assert not metadata_path.exists()
def test_default_upload_limit_is_100_mib(app):
assert app.config["MAX_CONTENT_LENGTH"] == 100 * 1024 * 1024
def test_too_large_upload_returns_friendly_message(tmp_path):
class SmallLimitConfig:
TESTING = True
SECRET_KEY = "test-secret"
MAX_CONTENT_LENGTH = 128
PREVIEW_RECORD_LIMIT = 5
OUTPUT_DIRECTORY = tmp_path / "tiny-limit-outputs"
app = create_app(SmallLimitConfig)
client = app.test_client()
log_file = io.BytesIO(SAMPLE_LOG.encode("utf-8"))
response = client.post(
"/convert",
data={
"mode": "vendor",
"output_format": "text",
"sort_by": "datetime",
"order": "asc",
"policy_cs": "",
"policy_ci": "",
"severity_cs": "",
"severity_ci": "",
"log_file": (log_file, "sample.log"),
},
content_type="multipart/form-data",
)
log_file.close()
assert response.status_code == 413
assert b"Maximum allowed size is 128 bytes." in response.data
response.close()