feat(v2): add simple daily extraction-import orchestration
This commit is contained in:
@ -0,0 +1,45 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.utils.dateparse import parse_date
|
||||
|
||||
from apps.ingestion.services.daily_orchestration import run_daily_orchestration
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Run daily HoopScout v2 workflow: extract snapshots, then import snapshots."
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
"--snapshot-date",
|
||||
dest="snapshot_date",
|
||||
default=None,
|
||||
help="Override snapshot date for all extractor outputs (YYYY-MM-DD).",
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
snapshot_date = None
|
||||
if options["snapshot_date"]:
|
||||
snapshot_date = parse_date(options["snapshot_date"])
|
||||
if snapshot_date is None:
|
||||
raise CommandError("--snapshot-date must be YYYY-MM-DD.")
|
||||
|
||||
try:
|
||||
result = run_daily_orchestration(snapshot_date=snapshot_date)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
raise CommandError(str(exc)) from exc
|
||||
|
||||
extractor_summary = ", ".join(
|
||||
f"{row.extractor_name}:{row.records_count}" for row in result.extractors_run
|
||||
)
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(
|
||||
"Daily orchestration completed: "
|
||||
f"extractors=[{extractor_summary}] "
|
||||
f"import_run={result.import_run_id} "
|
||||
f"import_status={result.import_status} "
|
||||
f"files_processed={result.files_processed} "
|
||||
f"rows_upserted={result.rows_upserted} "
|
||||
f"rows_failed={result.rows_failed}"
|
||||
)
|
||||
)
|
||||
84
apps/ingestion/services/daily_orchestration.py
Normal file
84
apps/ingestion/services/daily_orchestration.py
Normal file
@ -0,0 +1,84 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from apps.ingestion.extractors import create_extractor
|
||||
from apps.ingestion.services.snapshot_import import SnapshotImporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtractorRunSummary:
|
||||
extractor_name: str
|
||||
records_count: int
|
||||
output_path: Path | None
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyOrchestrationResult:
|
||||
extractors_run: list[ExtractorRunSummary]
|
||||
import_run_id: int
|
||||
import_status: str
|
||||
files_processed: int
|
||||
rows_upserted: int
|
||||
rows_failed: int
|
||||
|
||||
|
||||
def parse_enabled_extractors(raw_value: str) -> list[str]:
|
||||
return [item.strip() for item in raw_value.split(",") if item.strip()]
|
||||
|
||||
|
||||
def run_daily_orchestration(*, snapshot_date: date | None = None) -> DailyOrchestrationResult:
|
||||
extractor_names = parse_enabled_extractors(settings.DAILY_ORCHESTRATION_EXTRACTORS)
|
||||
if not extractor_names:
|
||||
raise ValueError("DAILY_ORCHESTRATION_EXTRACTORS cannot be empty.")
|
||||
|
||||
summaries: list[ExtractorRunSummary] = []
|
||||
for extractor_name in extractor_names:
|
||||
logger.info("daily_orchestration_extractor_start extractor=%s", extractor_name)
|
||||
extractor = create_extractor(extractor_name)
|
||||
result = extractor.run(snapshot_date=snapshot_date)
|
||||
summaries.append(
|
||||
ExtractorRunSummary(
|
||||
extractor_name=extractor_name,
|
||||
records_count=result.records_count,
|
||||
output_path=result.output_path,
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"daily_orchestration_extractor_done extractor=%s records=%s output=%s",
|
||||
extractor_name,
|
||||
result.records_count,
|
||||
result.output_path,
|
||||
)
|
||||
|
||||
importer = SnapshotImporter(
|
||||
incoming_dir=settings.STATIC_DATASET_INCOMING_DIR,
|
||||
archive_dir=settings.STATIC_DATASET_ARCHIVE_DIR,
|
||||
failed_dir=settings.STATIC_DATASET_FAILED_DIR,
|
||||
)
|
||||
import_run = importer.run()
|
||||
logger.info(
|
||||
"daily_orchestration_import_done run_id=%s status=%s files=%s/%s upserted=%s failed=%s",
|
||||
import_run.id,
|
||||
import_run.status,
|
||||
import_run.files_processed,
|
||||
import_run.files_total,
|
||||
import_run.rows_upserted,
|
||||
import_run.rows_failed,
|
||||
)
|
||||
|
||||
return DailyOrchestrationResult(
|
||||
extractors_run=summaries,
|
||||
import_run_id=import_run.id,
|
||||
import_status=import_run.status,
|
||||
files_processed=import_run.files_processed,
|
||||
rows_upserted=import_run.rows_upserted,
|
||||
rows_failed=import_run.rows_failed,
|
||||
)
|
||||
Reference in New Issue
Block a user