feat(v2): add snapshot extractor framework and run command
This commit is contained in:
150
apps/ingestion/extractors/base.py
Normal file
150
apps/ingestion/extractors/base.py
Normal file
@ -0,0 +1,150 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from apps.ingestion.snapshots import SnapshotSchemaValidator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExtractorError(RuntimeError):
|
||||
"""Base extractor exception."""
|
||||
|
||||
|
||||
class ExtractorConfigError(ExtractorError):
|
||||
"""Raised when extractor configuration is invalid."""
|
||||
|
||||
|
||||
class ExtractorFetchError(ExtractorError):
|
||||
"""Raised when remote/source fetch fails."""
|
||||
|
||||
|
||||
class ExtractorParseError(ExtractorError):
|
||||
"""Raised when fetched content cannot be parsed."""
|
||||
|
||||
|
||||
class ExtractorNormalizationError(ExtractorError):
|
||||
"""Raised when source rows cannot be normalized."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtractionResult:
|
||||
extractor_name: str
|
||||
source_name: str
|
||||
snapshot_date: date
|
||||
records_count: int
|
||||
output_path: Path | None
|
||||
|
||||
|
||||
class BaseSnapshotExtractor(ABC):
|
||||
extractor_name = "base"
|
||||
source_name = "unknown_source"
|
||||
|
||||
@abstractmethod
|
||||
def fetch(self) -> Any:
|
||||
"""Fetch source payload from a source endpoint/resource."""
|
||||
|
||||
@abstractmethod
|
||||
def parse(self, payload: Any) -> list[dict[str, Any]]:
|
||||
"""Parse fetched payload into source-specific record dictionaries."""
|
||||
|
||||
@abstractmethod
|
||||
def normalize_record(self, source_record: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Normalize a source record into HoopScout snapshot record shape."""
|
||||
|
||||
def resolve_snapshot_date(self) -> date:
|
||||
return date.today()
|
||||
|
||||
def normalize_records(self, source_records: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
normalized: list[dict[str, Any]] = []
|
||||
for idx, row in enumerate(source_records):
|
||||
if not isinstance(row, dict):
|
||||
raise ExtractorNormalizationError(f"Parsed record at index {idx} must be an object.")
|
||||
normalized.append(self.normalize_record(row))
|
||||
return normalized
|
||||
|
||||
def build_snapshot(self, records: list[dict[str, Any]], snapshot_date: date) -> dict[str, Any]:
|
||||
return {
|
||||
"source_name": self.source_name,
|
||||
"snapshot_date": snapshot_date.isoformat(),
|
||||
"records": records,
|
||||
}
|
||||
|
||||
def default_output_dir(self) -> Path:
|
||||
return Path(settings.STATIC_DATASET_INCOMING_DIR)
|
||||
|
||||
def snapshot_filename(self, snapshot_date: date) -> str:
|
||||
return f"{self.extractor_name}-{snapshot_date.isoformat()}.json"
|
||||
|
||||
def emit_snapshot(
|
||||
self,
|
||||
snapshot: dict[str, Any],
|
||||
*,
|
||||
output_path: str | Path | None = None,
|
||||
indent: int = 2,
|
||||
) -> Path:
|
||||
if output_path is None:
|
||||
destination = self.default_output_dir()
|
||||
destination.mkdir(parents=True, exist_ok=True)
|
||||
file_path = destination / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"]))
|
||||
else:
|
||||
target = Path(output_path)
|
||||
if target.suffix.lower() == ".json":
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path = target
|
||||
else:
|
||||
target.mkdir(parents=True, exist_ok=True)
|
||||
file_path = target / self.snapshot_filename(date.fromisoformat(snapshot["snapshot_date"]))
|
||||
|
||||
file_path.write_text(json.dumps(snapshot, indent=indent, ensure_ascii=True), encoding="utf-8")
|
||||
return file_path
|
||||
|
||||
def run(
|
||||
self,
|
||||
*,
|
||||
output_path: str | Path | None = None,
|
||||
snapshot_date: date | None = None,
|
||||
write_output: bool = True,
|
||||
indent: int = 2,
|
||||
) -> ExtractionResult:
|
||||
payload = self.fetch()
|
||||
source_rows = self.parse(payload)
|
||||
normalized_rows = self.normalize_records(source_rows)
|
||||
resolved_snapshot_date = snapshot_date or self.resolve_snapshot_date()
|
||||
snapshot = self.build_snapshot(normalized_rows, resolved_snapshot_date)
|
||||
validated = SnapshotSchemaValidator.validate(snapshot)
|
||||
snapshot["records"] = validated.records
|
||||
|
||||
output_file: Path | None = None
|
||||
if write_output:
|
||||
output_file = self.emit_snapshot(snapshot, output_path=output_path, indent=indent)
|
||||
logger.info(
|
||||
"extractor_snapshot_written extractor=%s source=%s records=%s path=%s",
|
||||
self.extractor_name,
|
||||
validated.source_name,
|
||||
len(validated.records),
|
||||
output_file,
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"extractor_snapshot_validated extractor=%s source=%s records=%s write_output=0",
|
||||
self.extractor_name,
|
||||
validated.source_name,
|
||||
len(validated.records),
|
||||
)
|
||||
|
||||
return ExtractionResult(
|
||||
extractor_name=self.extractor_name,
|
||||
source_name=validated.source_name,
|
||||
snapshot_date=validated.snapshot_date,
|
||||
records_count=len(validated.records),
|
||||
output_path=output_file,
|
||||
)
|
||||
Reference in New Issue
Block a user