151 lines
4.9 KiB
Python
151 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from django.conf import settings
|
|
|
|
from apps.ingestion.snapshots import SnapshotSchemaValidator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ExtractorError(RuntimeError):
|
|
"""Base extractor exception."""
|
|
|
|
|
|
class ExtractorConfigError(ExtractorError):
|
|
"""Raised when extractor configuration is invalid."""
|
|
|
|
|
|
class ExtractorFetchError(ExtractorError):
|
|
"""Raised when remote/source fetch fails."""
|
|
|
|
|
|
class ExtractorParseError(ExtractorError):
|
|
"""Raised when fetched content cannot be parsed."""
|
|
|
|
|
|
class ExtractorNormalizationError(ExtractorError):
|
|
"""Raised when source rows cannot be normalized."""
|
|
|
|
|
|
@dataclass
|
|
class ExtractionResult:
|
|
extractor_name: str
|
|
source_name: str
|
|
snapshot_date: date
|
|
records_count: int
|
|
output_path: Path | None
|
|
|
|
|
|
class BaseSnapshotExtractor(ABC):
|
|
extractor_name = "base"
|
|
source_name = "unknown_source"
|
|
|
|
@abstractmethod
|
|
def fetch(self) -> Any:
|
|
"""Fetch source payload from a source endpoint/resource."""
|
|
|
|
@abstractmethod
|
|
def parse(self, payload: Any) -> list[dict[str, Any]]:
|
|
"""Parse fetched payload into source-specific record dictionaries."""
|
|
|
|
@abstractmethod
|
|
def normalize_record(self, source_record: dict[str, Any]) -> dict[str, Any]:
|
|
"""Normalize a source record into HoopScout snapshot record shape."""
|
|
|
|
def resolve_snapshot_date(self) -> date:
|
|
return date.today()
|
|
|
|
def normalize_records(self, source_records: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
for idx, row in enumerate(source_records):
|
|
if not isinstance(row, dict):
|
|
raise ExtractorNormalizationError(f"Parsed record at index {idx} must be an object.")
|
|
normalized.append(self.normalize_record(row))
|
|
return normalized
|
|
|
|
def build_snapshot(self, records: list[dict[str, Any]], snapshot_date: date) -> dict[str, Any]:
|
|
return {
|
|
"source_name": self.source_name,
|
|
"snapshot_date": snapshot_date.isoformat(),
|
|
"records": records,
|
|
}
|
|
|
|
def default_output_dir(self) -> Path:
|
|
return Path(settings.STATIC_DATASET_INCOMING_DIR)
|
|
|
|
def snapshot_filename(self, snapshot_date: date) -> str:
|
|
return f"{self.extractor_name}-{snapshot_date.isoformat()}.json"
|
|
|
|
def emit_snapshot(
|
|
self,
|
|
snapshot: dict[str, Any],
|
|
*,
|
|
output_path: str | Path | None = None,
|
|
indent: int = 2,
|
|
) -> Path:
|
|
if output_path is None:
|
|
destination = self.default_output_dir()
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
file_path = destination / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"]))
|
|
else:
|
|
target = Path(output_path)
|
|
if target.suffix.lower() == ".json":
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
file_path = target
|
|
else:
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
file_path = target / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"]))
|
|
|
|
file_path.write_text(json.dumps(snapshot, indent=indent, ensure_ascii=True), encoding="utf-8")
|
|
return file_path
|
|
|
|
def run(
|
|
self,
|
|
*,
|
|
output_path: str | Path | None = None,
|
|
snapshot_date: date | None = None,
|
|
write_output: bool = True,
|
|
indent: int = 2,
|
|
) -> ExtractionResult:
|
|
payload = self.fetch()
|
|
source_rows = self.parse(payload)
|
|
normalized_rows = self.normalize_records(source_rows)
|
|
resolved_snapshot_date = snapshot_date or self.resolve_snapshot_date()
|
|
snapshot = self.build_snapshot(normalized_rows, resolved_snapshot_date)
|
|
validated = SnapshotSchemaValidator.validate(snapshot)
|
|
snapshot["records"] = validated.records
|
|
|
|
output_file: Path | None = None
|
|
if write_output:
|
|
output_file = self.emit_snapshot(snapshot, output_path=output_path, indent=indent)
|
|
logger.info(
|
|
"extractor_snapshot_written extractor=%s source=%s records=%s path=%s",
|
|
self.extractor_name,
|
|
validated.source_name,
|
|
len(validated.records),
|
|
output_file,
|
|
)
|
|
else:
|
|
logger.info(
|
|
"extractor_snapshot_validated extractor=%s source=%s records=%s write_output=0",
|
|
self.extractor_name,
|
|
validated.source_name,
|
|
len(validated.records),
|
|
)
|
|
|
|
return ExtractionResult(
|
|
extractor_name=self.extractor_name,
|
|
source_name=validated.source_name,
|
|
snapshot_date=validated.snapshot_date,
|
|
records_count=len(validated.records),
|
|
output_path=output_file,
|
|
)
|