import json import os from pathlib import Path from app.services.storage import cleanup_expired_outputs, delete_result_files, persist_result def test_persist_result_writes_csv_and_collects_preview(tmp_path: Path): metadata, export_result = persist_result( output_dir=tmp_path, records=[ { "v015xxxxdate": "2024-05-01", "time": "10:00:00", "policy": "Prod Policy", "severity_level": "high", }, { "v015xxxxdate": "2024-05-02", "time": "11:00:00", "policy": "Other Policy", "severity_level": "low", }, ], union_keys=["v015xxxxdate", "time", "policy", "severity_level"], mode="full", output_format="csv", preview_record_limit=1, ) written = Path(metadata.file_path).read_text(encoding="utf-8") assert metadata.download_name == "waf-report.csv" assert "v015xxxxdate,time,policy,severity_level" in written assert "2024-05-01,10:00:00,Prod Policy,high" in written assert export_result.preview(1).count("\n") == 1 def test_delete_result_files_removes_output_and_metadata(tmp_path: Path): result_id = "delete-me" output_file = tmp_path / f"{result_id}.txt" metadata_file = tmp_path / f"{result_id}.json" output_file.write_text("content", encoding="utf-8") metadata_file.write_text("{}", encoding="utf-8") delete_result_files(output_dir=tmp_path, result_id=result_id) assert not output_file.exists() assert not metadata_file.exists() def test_cleanup_expired_outputs_removes_only_old_results(tmp_path: Path): old_result_id = "old-result" new_result_id = "new-result" old_output = tmp_path / f"{old_result_id}.csv" old_metadata = tmp_path / f"{old_result_id}.json" new_output = tmp_path / f"{new_result_id}.csv" new_metadata = tmp_path / f"{new_result_id}.json" old_output.write_text("old", encoding="utf-8") new_output.write_text("new", encoding="utf-8") old_metadata.write_text(json.dumps({"result_id": old_result_id, "file_path": str(old_output)}), encoding="utf-8") new_metadata.write_text(json.dumps({"result_id": new_result_id, "file_path": str(new_output)}), encoding="utf-8") old_timestamp = 946684800 os.utime(old_output, (old_timestamp, old_timestamp)) os.utime(old_metadata, (old_timestamp, old_timestamp)) deleted_results = cleanup_expired_outputs(output_dir=tmp_path, retention_hours=1) assert deleted_results == 1 assert not old_output.exists() assert not old_metadata.exists() assert new_output.exists() assert new_metadata.exists()