from __future__ import annotations import logging from dataclasses import dataclass from datetime import date from pathlib import Path from django.conf import settings from apps.ingestion.extractors import create_extractor from apps.ingestion.services.snapshot_import import SnapshotImporter logger = logging.getLogger(__name__) @dataclass class ExtractorRunSummary: extractor_name: str records_count: int output_path: Path | None @dataclass class DailyOrchestrationResult: extractors_run: list[ExtractorRunSummary] import_run_id: int import_status: str files_processed: int rows_upserted: int rows_failed: int def parse_enabled_extractors(raw_value: str) -> list[str]: return [item.strip() for item in raw_value.split(",") if item.strip()] def run_daily_orchestration(*, snapshot_date: date | None = None) -> DailyOrchestrationResult: extractor_names = parse_enabled_extractors(settings.DAILY_ORCHESTRATION_EXTRACTORS) if not extractor_names: raise ValueError("DAILY_ORCHESTRATION_EXTRACTORS cannot be empty.") summaries: list[ExtractorRunSummary] = [] for extractor_name in extractor_names: logger.info("daily_orchestration_extractor_start extractor=%s", extractor_name) extractor = create_extractor(extractor_name) result = extractor.run(snapshot_date=snapshot_date) summaries.append( ExtractorRunSummary( extractor_name=extractor_name, records_count=result.records_count, output_path=result.output_path, ) ) logger.info( "daily_orchestration_extractor_done extractor=%s records=%s output=%s", extractor_name, result.records_count, result.output_path, ) importer = SnapshotImporter( incoming_dir=settings.STATIC_DATASET_INCOMING_DIR, archive_dir=settings.STATIC_DATASET_ARCHIVE_DIR, failed_dir=settings.STATIC_DATASET_FAILED_DIR, ) import_run = importer.run() logger.info( "daily_orchestration_import_done run_id=%s status=%s files=%s/%s upserted=%s failed=%s", import_run.id, import_run.status, import_run.files_processed, import_run.files_total, import_run.rows_upserted, import_run.rows_failed, ) return DailyOrchestrationResult( extractors_run=summaries, import_run_id=import_run.id, import_status=import_run.status, files_processed=import_run.files_processed, rows_upserted=import_run.rows_upserted, rows_failed=import_run.rows_failed, )