85 lines
2.6 KiB
Python
85 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
from pathlib import Path
|
|
|
|
from django.conf import settings
|
|
|
|
from apps.ingestion.extractors import create_extractor
|
|
from apps.ingestion.services.snapshot_import import SnapshotImporter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ExtractorRunSummary:
|
|
extractor_name: str
|
|
records_count: int
|
|
output_path: Path | None
|
|
|
|
|
|
@dataclass
|
|
class DailyOrchestrationResult:
|
|
extractors_run: list[ExtractorRunSummary]
|
|
import_run_id: int
|
|
import_status: str
|
|
files_processed: int
|
|
rows_upserted: int
|
|
rows_failed: int
|
|
|
|
|
|
def parse_enabled_extractors(raw_value: str) -> list[str]:
|
|
return [item.strip() for item in raw_value.split(",") if item.strip()]
|
|
|
|
|
|
def run_daily_orchestration(*, snapshot_date: date | None = None) -> DailyOrchestrationResult:
|
|
extractor_names = parse_enabled_extractors(settings.DAILY_ORCHESTRATION_EXTRACTORS)
|
|
if not extractor_names:
|
|
raise ValueError("DAILY_ORCHESTRATION_EXTRACTORS cannot be empty.")
|
|
|
|
summaries: list[ExtractorRunSummary] = []
|
|
for extractor_name in extractor_names:
|
|
logger.info("daily_orchestration_extractor_start extractor=%s", extractor_name)
|
|
extractor = create_extractor(extractor_name)
|
|
result = extractor.run(snapshot_date=snapshot_date)
|
|
summaries.append(
|
|
ExtractorRunSummary(
|
|
extractor_name=extractor_name,
|
|
records_count=result.records_count,
|
|
output_path=result.output_path,
|
|
)
|
|
)
|
|
logger.info(
|
|
"daily_orchestration_extractor_done extractor=%s records=%s output=%s",
|
|
extractor_name,
|
|
result.records_count,
|
|
result.output_path,
|
|
)
|
|
|
|
importer = SnapshotImporter(
|
|
incoming_dir=settings.STATIC_DATASET_INCOMING_DIR,
|
|
archive_dir=settings.STATIC_DATASET_ARCHIVE_DIR,
|
|
failed_dir=settings.STATIC_DATASET_FAILED_DIR,
|
|
)
|
|
import_run = importer.run()
|
|
logger.info(
|
|
"daily_orchestration_import_done run_id=%s status=%s files=%s/%s upserted=%s failed=%s",
|
|
import_run.id,
|
|
import_run.status,
|
|
import_run.files_processed,
|
|
import_run.files_total,
|
|
import_run.rows_upserted,
|
|
import_run.rows_failed,
|
|
)
|
|
|
|
return DailyOrchestrationResult(
|
|
extractors_run=summaries,
|
|
import_run_id=import_run.id,
|
|
import_status=import_run.status,
|
|
files_processed=import_run.files_processed,
|
|
rows_upserted=import_run.rows_upserted,
|
|
rows_failed=import_run.rows_failed,
|
|
)
|