from __future__ import annotations import hashlib import json import shutil from dataclasses import dataclass from datetime import date, datetime from decimal import Decimal from pathlib import Path from typing import Any from django.db import transaction from django.template.defaultfilters import slugify from django.utils import timezone from django.utils.dateparse import parse_date from apps.competitions.models import Competition, Season from apps.ingestion.models import ImportFile, ImportRun from apps.ingestion.snapshots import SnapshotSchemaValidator, SnapshotValidationError from apps.players.models import Nationality, Player, Position, Role from apps.stats.models import PlayerSeason, PlayerSeasonStats from apps.teams.models import Team @dataclass class ImportSummary: files_total: int = 0 files_processed: int = 0 rows_total: int = 0 rows_upserted: int = 0 rows_failed: int = 0 def _safe_move(src: Path, destination_dir: Path) -> Path: destination_dir.mkdir(parents=True, exist_ok=True) candidate = destination_dir / src.name if candidate.exists(): ts = datetime.utcnow().strftime("%Y%m%d%H%M%S") candidate = destination_dir / f"{src.stem}-{ts}{src.suffix}" shutil.move(str(src), str(candidate)) return candidate def _file_checksum(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def _normalize_decimal(value: float | int | str) -> Decimal: return Decimal(str(value)).quantize(Decimal("0.01")) def _parse_season_dates(label: str) -> tuple[date, date]: if "-" in label: first = label.split("-", 1)[0] else: first = label year = int(first) return date(year, 9, 1), date(year + 1, 7, 31) def _parse_optional_birth_date(value: str | None) -> date | None: if value in (None, ""): return None return parse_date(value) def _split_name_parts(full_name: str) -> tuple[str, str]: parts = full_name.strip().split(maxsplit=1) if not parts: return "", "" if len(parts) == 1: return parts[0], "" return parts[0], parts[1] def _resolve_nationality(value: str | None) -> Nationality | None: if not value: return None token = value.strip() if not token: return None if len(token) == 2: code = token.upper() obj, _ = Nationality.objects.get_or_create( iso2_code=code, defaults={"name": code}, ) return obj return Nationality.objects.filter(name__iexact=token).first() def _position_code(position_value: str) -> str: token = position_value.strip().upper().replace(" ", "_") return (token[:10] or "UNK") def _role_code(role_value: str) -> str: token = slugify(role_value).replace("-", "_") return (token[:32] or "unknown") def _player_season_source_uid(record: dict[str, Any], source_name: str, snapshot_date: date) -> str: return ( f"{source_name}:{snapshot_date.isoformat()}:" f"{record['competition_external_id']}:{record['season']}:" f"{record['team_external_id']}:{record['player_external_id']}" ) def _source_slug(*, source_name: str, base_name: str, fallback_prefix: str, fallback_external_id: str) -> str: base_slug = slugify(base_name) or f"{fallback_prefix}-{fallback_external_id}" source_slug = slugify(source_name) or "snapshot" return f"{source_slug}-{base_slug}" def _normalized_source_name(source_name: str) -> str: return source_name.strip().lower() def _upsert_record(record: dict[str, Any], *, source_name: str, snapshot_date: date) -> None: source_key = _normalized_source_name(source_name) competition_slug = _source_slug( source_name=source_key, base_name=record["competition_name"], fallback_prefix="competition", fallback_external_id=record["competition_external_id"], ) competition, _ = Competition.objects.update_or_create( source_name=source_key, source_uid=record["competition_external_id"], defaults={ "name": record["competition_name"], "slug": competition_slug, "competition_type": Competition.CompetitionType.LEAGUE, "is_active": True, }, ) start_date, end_date = _parse_season_dates(record["season"]) season, _ = Season.objects.update_or_create( source_uid=f"season:{record['season']}", defaults={ "label": record["season"], "start_date": start_date, "end_date": end_date, "is_current": False, }, ) team_slug = _source_slug( source_name=source_key, base_name=record["team_name"], fallback_prefix="team", fallback_external_id=record["team_external_id"], ) team, _ = Team.objects.update_or_create( source_name=source_key, source_uid=record["team_external_id"], defaults={ "name": record["team_name"], "slug": team_slug, "short_name": "", }, ) position_value = record.get("position") position = None if position_value: position, _ = Position.objects.get_or_create( code=_position_code(position_value), defaults={"name": position_value}, ) role = None if record.get("role"): role, _ = Role.objects.get_or_create( code=_role_code(record["role"]), defaults={"name": record["role"]}, ) first_name = record.get("first_name") or "" last_name = record.get("last_name") or "" if not first_name and not last_name: first_name, last_name = _split_name_parts(record["full_name"]) player, _ = Player.objects.update_or_create( source_name=source_key, source_uid=record["player_external_id"], defaults={ "first_name": first_name, "last_name": last_name, "full_name": record["full_name"], "birth_date": _parse_optional_birth_date(record.get("birth_date")), "nationality": _resolve_nationality(record.get("nationality")), "nominal_position": position, "inferred_role": role, "height_cm": record.get("height_cm"), "weight_kg": record.get("weight_kg"), "is_active": True, }, ) player_season, _ = PlayerSeason.objects.update_or_create( source_uid=_player_season_source_uid(record, source_name=source_key, snapshot_date=snapshot_date), defaults={ "player": player, "season": season, "team": team, "competition": competition, "games_played": int(record["games_played"]), "games_started": 0, "minutes_played": int(round(float(record["minutes_per_game"]) * int(record["games_played"]))), }, ) PlayerSeasonStats.objects.update_or_create( player_season=player_season, defaults={ "points": _normalize_decimal(record["points_per_game"]), "rebounds": _normalize_decimal(record["rebounds_per_game"]), "assists": _normalize_decimal(record["assists_per_game"]), "steals": _normalize_decimal(record["steals_per_game"]), "blocks": _normalize_decimal(record["blocks_per_game"]), "turnovers": _normalize_decimal(record["turnovers_per_game"]), "fg_pct": _normalize_decimal(record["fg_pct"]), "three_pct": _normalize_decimal(record["three_pt_pct"]), "ft_pct": _normalize_decimal(record["ft_pct"]), }, ) class SnapshotImporter: def __init__(self, *, incoming_dir: str, archive_dir: str, failed_dir: str): self.incoming_dir = Path(incoming_dir) self.archive_dir = Path(archive_dir) self.failed_dir = Path(failed_dir) def _list_input_files(self) -> list[Path]: if not self.incoming_dir.exists(): return [] return sorted(path for path in self.incoming_dir.iterdir() if path.is_file() and path.suffix.lower() == ".json") def run(self, *, triggered_by=None) -> ImportRun: run = ImportRun.objects.create( source="static_snapshot_json", status=ImportRun.RunStatus.RUNNING, triggered_by=triggered_by, started_at=timezone.now(), context={ "incoming_dir": str(self.incoming_dir), "archive_dir": str(self.archive_dir), "failed_dir": str(self.failed_dir), }, ) summary = ImportSummary() files = self._list_input_files() summary.files_total = len(files) for path in files: checksum = _file_checksum(path) file_row = ImportFile.objects.create( import_run=run, relative_path=path.name, status=ImportFile.FileStatus.PROCESSING, checksum=checksum, file_size_bytes=path.stat().st_size, ) # Duplicate file content previously imported successfully. already_imported = ImportFile.objects.filter( checksum=checksum, status=ImportFile.FileStatus.SUCCESS, ).exclude(pk=file_row.pk).exists() if already_imported: file_row.status = ImportFile.FileStatus.SKIPPED file_row.error_message = "Skipped duplicate checksum already imported successfully." file_row.processed_at = timezone.now() file_row.save(update_fields=["status", "error_message", "processed_at"]) _safe_move(path, self.archive_dir) summary.files_processed += 1 continue try: payload = json.loads(path.read_text(encoding="utf-8")) validated = SnapshotSchemaValidator.validate(payload) file_row.source_name = validated.source_name file_row.snapshot_date = validated.snapshot_date file_row.rows_total = len(validated.records) with transaction.atomic(): for record in validated.records: _upsert_record(record, source_name=validated.source_name, snapshot_date=validated.snapshot_date) file_row.status = ImportFile.FileStatus.SUCCESS file_row.rows_upserted = len(validated.records) file_row.payload_preview = { "source_name": validated.source_name, "snapshot_date": validated.snapshot_date.isoformat(), "sample_record": validated.records[0], } _safe_move(path, self.archive_dir) except (json.JSONDecodeError, SnapshotValidationError, ValueError) as exc: file_row.status = ImportFile.FileStatus.FAILED file_row.error_message = str(exc) _safe_move(path, self.failed_dir) except Exception as exc: # noqa: BLE001 file_row.status = ImportFile.FileStatus.FAILED file_row.error_message = f"Unhandled import error: {exc}" _safe_move(path, self.failed_dir) file_row.processed_at = timezone.now() file_row.save( update_fields=[ "source_name", "snapshot_date", "status", "rows_total", "rows_upserted", "rows_failed", "error_message", "payload_preview", "processed_at", ] ) summary.files_processed += 1 summary.rows_total += file_row.rows_total summary.rows_upserted += file_row.rows_upserted summary.rows_failed += file_row.rows_failed + (1 if file_row.status == ImportFile.FileStatus.FAILED else 0) run.status = ImportRun.RunStatus.SUCCESS if summary.rows_failed == 0 else ImportRun.RunStatus.FAILED run.files_total = summary.files_total run.files_processed = summary.files_processed run.rows_total = summary.rows_total run.rows_upserted = summary.rows_upserted run.rows_failed = summary.rows_failed run.finished_at = timezone.now() if summary.rows_failed: run.error_summary = f"{summary.rows_failed} file/row import error(s)." run.save( update_fields=[ "status", "files_total", "files_processed", "rows_total", "rows_upserted", "rows_failed", "error_summary", "finished_at", ] ) return run