import io import pytest from app.services.parser import LogParseError, parse_log_file def test_parse_log_file_supports_shell_style_quotes(): stream = io.BytesIO( b'v015xxxxdate=2024-02-15 time=09:10:11 policy="Strict Policy" msg="blocked request"\n' ) records, union_keys = parse_log_file(stream) assert records == [ { "v015xxxxdate": "2024-02-15", "time": "09:10:11", "policy": "Strict Policy", "msg": "blocked request", } ] assert union_keys == ["v015xxxxdate", "time", "policy", "msg"] def test_parse_log_file_rejects_tokens_without_equals(): stream = io.BytesIO(b"v015xxxxdate=2024-02-15 broken-token\n") with pytest.raises(LogParseError): parse_log_file(stream) def test_parse_log_file_supports_utf8_bom(): stream = io.BytesIO( b'\xef\xbb\xbfv015xxxxdate=2024-02-15 time=09:10:11 msg="blocked request"\n' ) records, _union_keys = parse_log_file(stream) assert records[0]["v015xxxxdate"] == "2024-02-15" def test_parse_log_file_supports_cp1252_text(): stream = io.BytesIO( 'v015xxxxdate=2024-02-15 time=09:10:11 msg="caf\xe9 request"\n'.encode("cp1252") ) records, _union_keys = parse_log_file(stream) assert records[0]["msg"] == "cafe request".replace("e", "é", 1) def test_parse_log_file_tolerates_unterminated_quotes(): stream = io.BytesIO( b'v015xxxxdate=2024-02-15 time=09:10:11 msg="broken quoted value\n' ) records, _union_keys = parse_log_file(stream) assert records[0]["msg"] == "broken quoted value" def test_parse_log_file_rebuilds_record_after_embedded_newlines(): stream = io.BytesIO( b'v015xxxxdate=2024-02-15 time=09:10:11 msg="hello\n' b'broken-fragment\n' b'world" action=Alert\n' b'v015xxxxdate=2024-02-15 time=09:10:12 msg="next" action=Monitor\n' ) records, _union_keys = parse_log_file(stream) assert len(records) == 2 assert records[0]["msg"] == "hellobroken-fragmentworld" assert records[0]["action"] == "Alert" assert records[1]["msg"] == "next"