from __future__ import annotations import json import logging from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import date from pathlib import Path from typing import Any from django.conf import settings from apps.ingestion.snapshots import SnapshotSchemaValidator logger = logging.getLogger(__name__) class ExtractorError(RuntimeError): """Base extractor exception.""" class ExtractorConfigError(ExtractorError): """Raised when extractor configuration is invalid.""" class ExtractorFetchError(ExtractorError): """Raised when remote/source fetch fails.""" class ExtractorParseError(ExtractorError): """Raised when fetched content cannot be parsed.""" class ExtractorNormalizationError(ExtractorError): """Raised when source rows cannot be normalized.""" @dataclass class ExtractionResult: extractor_name: str source_name: str snapshot_date: date records_count: int output_path: Path | None class BaseSnapshotExtractor(ABC): extractor_name = "base" source_name = "unknown_source" @abstractmethod def fetch(self) -> Any: """Fetch source payload from a source endpoint/resource.""" @abstractmethod def parse(self, payload: Any) -> list[dict[str, Any]]: """Parse fetched payload into source-specific record dictionaries.""" @abstractmethod def normalize_record(self, source_record: dict[str, Any]) -> dict[str, Any]: """Normalize a source record into HoopScout snapshot record shape.""" def resolve_snapshot_date(self) -> date: return date.today() def normalize_records(self, source_records: list[dict[str, Any]]) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] for idx, row in enumerate(source_records): if not isinstance(row, dict): raise ExtractorNormalizationError(f"Parsed record at index {idx} must be an object.") normalized.append(self.normalize_record(row)) return normalized def build_snapshot(self, records: list[dict[str, Any]], snapshot_date: date) -> dict[str, Any]: return { "source_name": self.source_name, "snapshot_date": snapshot_date.isoformat(), "records": records, } def default_output_dir(self) -> Path: return Path(settings.STATIC_DATASET_INCOMING_DIR) def snapshot_filename(self, snapshot_date: date) -> str: return f"{self.extractor_name}-{snapshot_date.isoformat()}.json" def emit_snapshot( self, snapshot: dict[str, Any], *, output_path: str | Path | None = None, indent: int = 2, ) -> Path: if output_path is None: destination = self.default_output_dir() destination.mkdir(parents=True, exist_ok=True) file_path = destination / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"])) else: target = Path(output_path) if target.suffix.lower() == ".json": target.parent.mkdir(parents=True, exist_ok=True) file_path = target else: target.mkdir(parents=True, exist_ok=True) file_path = target / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"])) file_path.write_text(json.dumps(snapshot, indent=indent, ensure_ascii=True), encoding="utf-8") return file_path def run( self, *, output_path: str | Path | None = None, snapshot_date: date | None = None, write_output: bool = True, indent: int = 2, ) -> ExtractionResult: payload = self.fetch() source_rows = self.parse(payload) normalized_rows = self.normalize_records(source_rows) resolved_snapshot_date = snapshot_date or self.resolve_snapshot_date() snapshot = self.build_snapshot(normalized_rows, resolved_snapshot_date) validated = SnapshotSchemaValidator.validate(snapshot) snapshot["records"] = validated.records output_file: Path | None = None if write_output: output_file = self.emit_snapshot(snapshot, output_path=output_path, indent=indent) logger.info( "extractor_snapshot_written extractor=%s source=%s records=%s path=%s", self.extractor_name, validated.source_name, len(validated.records), output_file, ) else: logger.info( "extractor_snapshot_validated extractor=%s source=%s records=%s write_output=0", self.extractor_name, validated.source_name, len(validated.records), ) return ExtractionResult( extractor_name=self.extractor_name, source_name=validated.source_name, snapshot_date=validated.snapshot_date, records_count=len(validated.records), output_path=output_file, )