Files
hoopscout/apps/ingestion/extractors/base.py
2026-03-13 14:24:54 +01:00

151 lines
4.9 KiB
Python

from __future__ import annotations
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Any
from django.conf import settings
from apps.ingestion.snapshots import SnapshotSchemaValidator
logger = logging.getLogger(__name__)
class ExtractorError(RuntimeError):
"""Base extractor exception."""
class ExtractorConfigError(ExtractorError):
"""Raised when extractor configuration is invalid."""
class ExtractorFetchError(ExtractorError):
"""Raised when remote/source fetch fails."""
class ExtractorParseError(ExtractorError):
"""Raised when fetched content cannot be parsed."""
class ExtractorNormalizationError(ExtractorError):
"""Raised when source rows cannot be normalized."""
@dataclass
class ExtractionResult:
extractor_name: str
source_name: str
snapshot_date: date
records_count: int
output_path: Path | None
class BaseSnapshotExtractor(ABC):
extractor_name = "base"
source_name = "unknown_source"
@abstractmethod
def fetch(self) -> Any:
"""Fetch source payload from a source endpoint/resource."""
@abstractmethod
def parse(self, payload: Any) -> list[dict[str, Any]]:
"""Parse fetched payload into source-specific record dictionaries."""
@abstractmethod
def normalize_record(self, source_record: dict[str, Any]) -> dict[str, Any]:
"""Normalize a source record into HoopScout snapshot record shape."""
def resolve_snapshot_date(self) -> date:
return date.today()
def normalize_records(self, source_records: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for idx, row in enumerate(source_records):
if not isinstance(row, dict):
raise ExtractorNormalizationError(f"Parsed record at index {idx} must be an object.")
normalized.append(self.normalize_record(row))
return normalized
def build_snapshot(self, records: list[dict[str, Any]], snapshot_date: date) -> dict[str, Any]:
return {
"source_name": self.source_name,
"snapshot_date": snapshot_date.isoformat(),
"records": records,
}
def default_output_dir(self) -> Path:
return Path(settings.STATIC_DATASET_INCOMING_DIR)
def snapshot_filename(self, snapshot_date: date) -> str:
return f"{self.extractor_name}-{snapshot_date.isoformat()}.json"
def emit_snapshot(
self,
snapshot: dict[str, Any],
*,
output_path: str | Path | None = None,
indent: int = 2,
) -> Path:
if output_path is None:
destination = self.default_output_dir()
destination.mkdir(parents=True, exist_ok=True)
file_path = destination / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"]))
else:
target = Path(output_path)
if target.suffix.lower() == ".json":
target.parent.mkdir(parents=True, exist_ok=True)
file_path = target
else:
target.mkdir(parents=True, exist_ok=True)
file_path = target / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"]))
file_path.write_text(json.dumps(snapshot, indent=indent, ensure_ascii=True), encoding="utf-8")
return file_path
def run(
self,
*,
output_path: str | Path | None = None,
snapshot_date: date | None = None,
write_output: bool = True,
indent: int = 2,
) -> ExtractionResult:
payload = self.fetch()
source_rows = self.parse(payload)
normalized_rows = self.normalize_records(source_rows)
resolved_snapshot_date = snapshot_date or self.resolve_snapshot_date()
snapshot = self.build_snapshot(normalized_rows, resolved_snapshot_date)
validated = SnapshotSchemaValidator.validate(snapshot)
snapshot["records"] = validated.records
output_file: Path | None = None
if write_output:
output_file = self.emit_snapshot(snapshot, output_path=output_path, indent=indent)
logger.info(
"extractor_snapshot_written extractor=%s source=%s records=%s path=%s",
self.extractor_name,
validated.source_name,
len(validated.records),
output_file,
)
else:
logger.info(
"extractor_snapshot_validated extractor=%s source=%s records=%s write_output=0",
self.extractor_name,
validated.source_name,
len(validated.records),
)
return ExtractionResult(
extractor_name=self.extractor_name,
source_name=validated.source_name,
snapshot_date=validated.snapshot_date,
records_count=len(validated.records),
output_path=output_file,
)