Add v2 snapshot schema validation and import_snapshots command
This commit is contained in:
310
apps/ingestion/services/snapshot_import.py
Normal file
310
apps/ingestion/services/snapshot_import.py
Normal file
@ -0,0 +1,310 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from django.db import transaction
|
||||
from django.template.defaultfilters import slugify
|
||||
from django.utils import timezone
|
||||
from django.utils.dateparse import parse_date
|
||||
|
||||
from apps.competitions.models import Competition, Season
|
||||
from apps.ingestion.models import ImportFile, ImportRun
|
||||
from apps.ingestion.snapshots import SnapshotSchemaValidator, SnapshotValidationError
|
||||
from apps.players.models import Nationality, Player, Position, Role
|
||||
from apps.stats.models import PlayerSeason, PlayerSeasonStats
|
||||
from apps.teams.models import Team
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImportSummary:
|
||||
files_total: int = 0
|
||||
files_processed: int = 0
|
||||
rows_total: int = 0
|
||||
rows_upserted: int = 0
|
||||
rows_failed: int = 0
|
||||
|
||||
|
||||
def _safe_move(src: Path, destination_dir: Path) -> Path:
|
||||
destination_dir.mkdir(parents=True, exist_ok=True)
|
||||
candidate = destination_dir / src.name
|
||||
if candidate.exists():
|
||||
ts = datetime.utcnow().strftime("%Y%m%d%H%M%S")
|
||||
candidate = destination_dir / f"{src.stem}-{ts}{src.suffix}"
|
||||
shutil.move(str(src), str(candidate))
|
||||
return candidate
|
||||
|
||||
|
||||
def _file_checksum(path: Path) -> str:
|
||||
digest = hashlib.sha256()
|
||||
with path.open("rb") as handle:
|
||||
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
|
||||
digest.update(chunk)
|
||||
return digest.hexdigest()
|
||||
|
||||
|
||||
def _normalize_decimal(value: float | int | str) -> Decimal:
|
||||
return Decimal(str(value)).quantize(Decimal("0.01"))
|
||||
|
||||
|
||||
def _parse_season_dates(label: str) -> tuple[date, date]:
|
||||
if "-" in label:
|
||||
first = label.split("-", 1)[0]
|
||||
else:
|
||||
first = label
|
||||
year = int(first)
|
||||
return date(year, 9, 1), date(year + 1, 7, 31)
|
||||
|
||||
|
||||
def _resolve_nationality(value: str | None) -> Nationality | None:
|
||||
if not value:
|
||||
return None
|
||||
token = value.strip()
|
||||
if not token:
|
||||
return None
|
||||
if len(token) == 2:
|
||||
code = token.upper()
|
||||
obj, _ = Nationality.objects.get_or_create(
|
||||
iso2_code=code,
|
||||
defaults={"name": code},
|
||||
)
|
||||
return obj
|
||||
return Nationality.objects.filter(name__iexact=token).first()
|
||||
|
||||
|
||||
def _position_code(position_value: str) -> str:
|
||||
token = position_value.strip().upper().replace(" ", "_")
|
||||
return (token[:10] or "UNK")
|
||||
|
||||
|
||||
def _role_code(role_value: str) -> str:
|
||||
token = slugify(role_value).replace("-", "_")
|
||||
return (token[:32] or "unknown")
|
||||
|
||||
|
||||
def _player_season_source_uid(record: dict[str, Any], source_name: str, snapshot_date: date) -> str:
|
||||
return (
|
||||
f"{source_name}:{snapshot_date.isoformat()}:"
|
||||
f"{record['competition_external_id']}:{record['season']}:"
|
||||
f"{record['team_external_id']}:{record['player_external_id']}"
|
||||
)
|
||||
|
||||
|
||||
def _upsert_record(record: dict[str, Any], *, source_name: str, snapshot_date: date) -> None:
|
||||
competition_slug = slugify(record["competition_name"]) or f"competition-{record['competition_external_id']}"
|
||||
competition, _ = Competition.objects.update_or_create(
|
||||
source_uid=record["competition_external_id"],
|
||||
defaults={
|
||||
"name": record["competition_name"],
|
||||
"slug": competition_slug,
|
||||
"competition_type": Competition.CompetitionType.LEAGUE,
|
||||
"is_active": True,
|
||||
},
|
||||
)
|
||||
|
||||
start_date, end_date = _parse_season_dates(record["season"])
|
||||
season, _ = Season.objects.update_or_create(
|
||||
source_uid=f"season:{record['season']}",
|
||||
defaults={
|
||||
"label": record["season"],
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
"is_current": False,
|
||||
},
|
||||
)
|
||||
|
||||
team_slug = slugify(record["team_name"]) or f"team-{record['team_external_id']}"
|
||||
team, _ = Team.objects.update_or_create(
|
||||
source_uid=record["team_external_id"],
|
||||
defaults={
|
||||
"name": record["team_name"],
|
||||
"slug": team_slug,
|
||||
"short_name": "",
|
||||
},
|
||||
)
|
||||
|
||||
position, _ = Position.objects.get_or_create(
|
||||
code=_position_code(record["position"]),
|
||||
defaults={"name": record["position"]},
|
||||
)
|
||||
role = None
|
||||
if record.get("role"):
|
||||
role, _ = Role.objects.get_or_create(
|
||||
code=_role_code(record["role"]),
|
||||
defaults={"name": record["role"]},
|
||||
)
|
||||
|
||||
player, _ = Player.objects.update_or_create(
|
||||
source_uid=record["player_external_id"],
|
||||
defaults={
|
||||
"first_name": record["first_name"],
|
||||
"last_name": record["last_name"],
|
||||
"full_name": record["full_name"],
|
||||
"birth_date": parse_date(record["birth_date"]),
|
||||
"nationality": _resolve_nationality(record.get("nationality")),
|
||||
"nominal_position": position,
|
||||
"inferred_role": role,
|
||||
"height_cm": record["height_cm"],
|
||||
"weight_kg": record["weight_kg"],
|
||||
"is_active": True,
|
||||
},
|
||||
)
|
||||
|
||||
player_season, _ = PlayerSeason.objects.update_or_create(
|
||||
source_uid=_player_season_source_uid(record, source_name=source_name, snapshot_date=snapshot_date),
|
||||
defaults={
|
||||
"player": player,
|
||||
"season": season,
|
||||
"team": team,
|
||||
"competition": competition,
|
||||
"games_played": int(record["games_played"]),
|
||||
"games_started": 0,
|
||||
"minutes_played": int(round(float(record["minutes_per_game"]) * int(record["games_played"]))),
|
||||
},
|
||||
)
|
||||
|
||||
PlayerSeasonStats.objects.update_or_create(
|
||||
player_season=player_season,
|
||||
defaults={
|
||||
"points": _normalize_decimal(record["points_per_game"]),
|
||||
"rebounds": _normalize_decimal(record["rebounds_per_game"]),
|
||||
"assists": _normalize_decimal(record["assists_per_game"]),
|
||||
"steals": _normalize_decimal(record["steals_per_game"]),
|
||||
"blocks": _normalize_decimal(record["blocks_per_game"]),
|
||||
"turnovers": _normalize_decimal(record["turnovers_per_game"]),
|
||||
"fg_pct": _normalize_decimal(record["fg_pct"]),
|
||||
"three_pct": _normalize_decimal(record["three_pt_pct"]),
|
||||
"ft_pct": _normalize_decimal(record["ft_pct"]),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class SnapshotImporter:
|
||||
def __init__(self, *, incoming_dir: str, archive_dir: str, failed_dir: str):
|
||||
self.incoming_dir = Path(incoming_dir)
|
||||
self.archive_dir = Path(archive_dir)
|
||||
self.failed_dir = Path(failed_dir)
|
||||
|
||||
def _list_input_files(self) -> list[Path]:
|
||||
if not self.incoming_dir.exists():
|
||||
return []
|
||||
return sorted(path for path in self.incoming_dir.iterdir() if path.is_file() and path.suffix.lower() == ".json")
|
||||
|
||||
def run(self, *, triggered_by=None) -> ImportRun:
|
||||
run = ImportRun.objects.create(
|
||||
source="static_snapshot_json",
|
||||
status=ImportRun.RunStatus.RUNNING,
|
||||
triggered_by=triggered_by,
|
||||
started_at=timezone.now(),
|
||||
context={
|
||||
"incoming_dir": str(self.incoming_dir),
|
||||
"archive_dir": str(self.archive_dir),
|
||||
"failed_dir": str(self.failed_dir),
|
||||
},
|
||||
)
|
||||
|
||||
summary = ImportSummary()
|
||||
files = self._list_input_files()
|
||||
summary.files_total = len(files)
|
||||
|
||||
for path in files:
|
||||
checksum = _file_checksum(path)
|
||||
file_row = ImportFile.objects.create(
|
||||
import_run=run,
|
||||
relative_path=path.name,
|
||||
status=ImportFile.FileStatus.PROCESSING,
|
||||
checksum=checksum,
|
||||
file_size_bytes=path.stat().st_size,
|
||||
)
|
||||
|
||||
# Duplicate file content previously imported successfully.
|
||||
already_imported = ImportFile.objects.filter(
|
||||
checksum=checksum,
|
||||
status=ImportFile.FileStatus.SUCCESS,
|
||||
).exclude(pk=file_row.pk).exists()
|
||||
if already_imported:
|
||||
file_row.status = ImportFile.FileStatus.SKIPPED
|
||||
file_row.error_message = "Skipped duplicate checksum already imported successfully."
|
||||
file_row.processed_at = timezone.now()
|
||||
file_row.save(update_fields=["status", "error_message", "processed_at"])
|
||||
_safe_move(path, self.archive_dir)
|
||||
summary.files_processed += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
validated = SnapshotSchemaValidator.validate(payload)
|
||||
|
||||
file_row.source_name = validated.source_name
|
||||
file_row.snapshot_date = validated.snapshot_date
|
||||
file_row.rows_total = len(validated.records)
|
||||
|
||||
with transaction.atomic():
|
||||
for record in validated.records:
|
||||
_upsert_record(record, source_name=validated.source_name, snapshot_date=validated.snapshot_date)
|
||||
|
||||
file_row.status = ImportFile.FileStatus.SUCCESS
|
||||
file_row.rows_upserted = len(validated.records)
|
||||
file_row.payload_preview = {
|
||||
"source_name": validated.source_name,
|
||||
"snapshot_date": validated.snapshot_date.isoformat(),
|
||||
"sample_record": validated.records[0],
|
||||
}
|
||||
_safe_move(path, self.archive_dir)
|
||||
except (json.JSONDecodeError, SnapshotValidationError, ValueError) as exc:
|
||||
file_row.status = ImportFile.FileStatus.FAILED
|
||||
file_row.error_message = str(exc)
|
||||
_safe_move(path, self.failed_dir)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
file_row.status = ImportFile.FileStatus.FAILED
|
||||
file_row.error_message = f"Unhandled import error: {exc}"
|
||||
_safe_move(path, self.failed_dir)
|
||||
|
||||
file_row.processed_at = timezone.now()
|
||||
file_row.save(
|
||||
update_fields=[
|
||||
"source_name",
|
||||
"snapshot_date",
|
||||
"status",
|
||||
"rows_total",
|
||||
"rows_upserted",
|
||||
"rows_failed",
|
||||
"error_message",
|
||||
"payload_preview",
|
||||
"processed_at",
|
||||
]
|
||||
)
|
||||
|
||||
summary.files_processed += 1
|
||||
summary.rows_total += file_row.rows_total
|
||||
summary.rows_upserted += file_row.rows_upserted
|
||||
summary.rows_failed += file_row.rows_failed + (1 if file_row.status == ImportFile.FileStatus.FAILED else 0)
|
||||
|
||||
run.status = ImportRun.RunStatus.SUCCESS if summary.rows_failed == 0 else ImportRun.RunStatus.FAILED
|
||||
run.files_total = summary.files_total
|
||||
run.files_processed = summary.files_processed
|
||||
run.rows_total = summary.rows_total
|
||||
run.rows_upserted = summary.rows_upserted
|
||||
run.rows_failed = summary.rows_failed
|
||||
run.finished_at = timezone.now()
|
||||
if summary.rows_failed:
|
||||
run.error_summary = f"{summary.rows_failed} file/row import error(s)."
|
||||
run.save(
|
||||
update_fields=[
|
||||
"status",
|
||||
"files_total",
|
||||
"files_processed",
|
||||
"rows_total",
|
||||
"rows_upserted",
|
||||
"rows_failed",
|
||||
"error_summary",
|
||||
"finished_at",
|
||||
]
|
||||
)
|
||||
return run
|
||||
Reference in New Issue
Block a user