Build Flask WAF log converter app
This commit is contained in:
26
tests/conftest.py
Normal file
26
tests/conftest.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from app import create_app
|
||||
|
||||
|
||||
class TestConfig:
|
||||
TESTING = True
|
||||
SECRET_KEY = "test-secret"
|
||||
MAX_CONTENT_LENGTH = 1024 * 1024
|
||||
PREVIEW_RECORD_LIMIT = 5
|
||||
OUTPUT_DIRECTORY = "test-outputs"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def app():
|
||||
flask_app = create_app(TestConfig)
|
||||
yield flask_app
|
||||
shutil.rmtree(Path(flask_app.instance_path) / "test-outputs", ignore_errors=True)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def client(app):
|
||||
return app.test_client()
|
||||
118
tests/test_app.py
Normal file
118
tests/test_app.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import io
|
||||
|
||||
|
||||
SAMPLE_LOG = (
|
||||
'v015xxxxdate=2024-05-01 time=10:00:00 policy="Prod Policy" '
|
||||
'http_method=GET http_host=example.com http_url="/login" '
|
||||
'http_refer="https://ref.example" service=edge backend_service=api '
|
||||
'msg="SQL injection blocked" signature_subclass=SQL signature_id=942100 '
|
||||
'owasp_top10=A03 match_location=body action=blocked severity_level=high\n'
|
||||
'v015xxxxdate=2024-05-02 time=11:00:00 policy="Prod Policy" '
|
||||
'http_method=POST http_host=example.com http_url="/checkout" '
|
||||
'http_refer="https://shop.example" service=edge backend_service=orders '
|
||||
'msg="XSS blocked" signature_subclass=XSS signature_id=941100 '
|
||||
'owasp_top10=A03 match_location=query action=monitored severity_level=medium\n'
|
||||
)
|
||||
|
||||
|
||||
def test_index_page_loads(client):
|
||||
response = client.get("/")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert b"WAF Log Converter" in response.data
|
||||
|
||||
|
||||
def test_convert_returns_text_preview_and_download_link(client):
|
||||
response = client.post(
|
||||
"/convert",
|
||||
data={
|
||||
"mode": "vendor",
|
||||
"output_format": "text",
|
||||
"sort_by": "severity",
|
||||
"order": "desc",
|
||||
"policy_cs": "",
|
||||
"policy_ci": "prod",
|
||||
"severity_cs": "",
|
||||
"severity_ci": "",
|
||||
"log_file": (io.BytesIO(SAMPLE_LOG.encode("utf-8")), "sample.log"),
|
||||
},
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert b"Download export" in response.data
|
||||
assert b"--- record 1 ---" in response.data
|
||||
|
||||
|
||||
def test_convert_full_mode_csv_preserves_union_order(client):
|
||||
response = client.post(
|
||||
"/convert",
|
||||
data={
|
||||
"mode": "full",
|
||||
"output_format": "csv",
|
||||
"sort_by": "datetime",
|
||||
"order": "asc",
|
||||
"policy_cs": "",
|
||||
"policy_ci": "",
|
||||
"severity_cs": "",
|
||||
"severity_ci": "",
|
||||
"log_file": (io.BytesIO(SAMPLE_LOG.encode("utf-8")), "sample.log"),
|
||||
},
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert b"TEXT" not in response.data
|
||||
assert b"Download export" in response.data
|
||||
|
||||
|
||||
def test_convert_rejects_mutually_exclusive_filters(client):
|
||||
response = client.post(
|
||||
"/convert",
|
||||
data={
|
||||
"mode": "vendor",
|
||||
"output_format": "csv",
|
||||
"sort_by": "datetime",
|
||||
"order": "asc",
|
||||
"policy_cs": "A",
|
||||
"policy_ci": "a",
|
||||
"severity_cs": "",
|
||||
"severity_ci": "",
|
||||
"log_file": (io.BytesIO(SAMPLE_LOG.encode("utf-8")), "sample.log"),
|
||||
},
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
assert b"Policy filter must use either case-sensitive or case-insensitive match" in response.data
|
||||
|
||||
|
||||
def test_download_route_returns_generated_file(client):
|
||||
convert_response = client.post(
|
||||
"/convert",
|
||||
data={
|
||||
"mode": "vendor",
|
||||
"output_format": "csv",
|
||||
"sort_by": "datetime",
|
||||
"order": "asc",
|
||||
"policy_cs": "",
|
||||
"policy_ci": "",
|
||||
"severity_cs": "",
|
||||
"severity_ci": "",
|
||||
"log_file": (io.BytesIO(SAMPLE_LOG.encode("utf-8")), "sample.log"),
|
||||
},
|
||||
content_type="multipart/form-data",
|
||||
)
|
||||
|
||||
html = convert_response.data.decode("utf-8")
|
||||
marker = '/download/'
|
||||
start = html.index(marker) + len(marker)
|
||||
end = html.index('"', start)
|
||||
result_id = html[start:end]
|
||||
|
||||
download_response = client.get(f"/download/{result_id}")
|
||||
|
||||
assert download_response.status_code == 200
|
||||
assert download_response.headers["Content-Type"].startswith("text/csv")
|
||||
assert b"v015xxxxdate,time,policy" in download_response.data
|
||||
download_response.close()
|
||||
30
tests/test_parser.py
Normal file
30
tests/test_parser.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import io
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.parser import LogParseError, parse_log_file
|
||||
|
||||
|
||||
def test_parse_log_file_supports_shell_style_quotes():
|
||||
stream = io.BytesIO(
|
||||
b'v015xxxxdate=2024-02-15 time=09:10:11 policy="Strict Policy" msg="blocked request"\n'
|
||||
)
|
||||
|
||||
records, union_keys = parse_log_file(stream)
|
||||
|
||||
assert records == [
|
||||
{
|
||||
"v015xxxxdate": "2024-02-15",
|
||||
"time": "09:10:11",
|
||||
"policy": "Strict Policy",
|
||||
"msg": "blocked request",
|
||||
}
|
||||
]
|
||||
assert union_keys == ["v015xxxxdate", "time", "policy", "msg"]
|
||||
|
||||
|
||||
def test_parse_log_file_rejects_tokens_without_equals():
|
||||
stream = io.BytesIO(b"v015xxxxdate=2024-02-15 broken-token\n")
|
||||
|
||||
with pytest.raises(LogParseError):
|
||||
parse_log_file(stream)
|
||||
46
tests/test_processing.py
Normal file
46
tests/test_processing.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from app.services.processing import ProcessingOptions, filter_records, sort_records
|
||||
|
||||
|
||||
def test_filter_records_supports_case_insensitive_filters():
|
||||
records = [
|
||||
{"policy": "ProdPolicy", "severity_level": "HIGH"},
|
||||
{"policy": "OtherPolicy", "severity_level": "low"},
|
||||
]
|
||||
options = ProcessingOptions(
|
||||
policy_cs="",
|
||||
policy_ci="prod",
|
||||
severity_cs="",
|
||||
severity_ci="high",
|
||||
sort_by="datetime",
|
||||
order="asc",
|
||||
mode="vendor",
|
||||
)
|
||||
|
||||
filtered = filter_records(records, options)
|
||||
|
||||
assert filtered == [{"policy": "ProdPolicy", "severity_level": "HIGH"}]
|
||||
|
||||
|
||||
def test_sort_records_by_severity_desc_uses_defined_ranking():
|
||||
records = [
|
||||
{"severity_level": "medium"},
|
||||
{"severity_level": "critical"},
|
||||
{"severity_level": "info"},
|
||||
]
|
||||
options = ProcessingOptions(
|
||||
policy_cs="",
|
||||
policy_ci="",
|
||||
severity_cs="",
|
||||
severity_ci="",
|
||||
sort_by="severity",
|
||||
order="desc",
|
||||
mode="vendor",
|
||||
)
|
||||
|
||||
sorted_records = sort_records(records, options)
|
||||
|
||||
assert [record["severity_level"] for record in sorted_records] == [
|
||||
"critical",
|
||||
"medium",
|
||||
"info",
|
||||
]
|
||||
Reference in New Issue
Block a user