import logging from dataclasses import dataclass from datetime import date from django.contrib.contenttypes.models import ContentType from django.contrib.auth import get_user_model from django.db import transaction from django.utils.dateparse import parse_date from apps.competitions.models import Competition, Season from apps.ingestion.models import IngestionRun from apps.ingestion.services.runs import ( finish_ingestion_run, log_ingestion_error, start_ingestion_run, update_ingestion_run_progress, ) from apps.players.models import Nationality, Player, PlayerAlias, PlayerCareerEntry, Position, Role from apps.players.services.origin import refresh_player_origin from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError from apps.providers.registry import get_provider from apps.providers.services.mappings import upsert_external_mapping from apps.stats.models import PlayerSeason, PlayerSeasonStats from apps.teams.models import Team logger = logging.getLogger(__name__) @dataclass class SyncSummary: processed: int = 0 created: int = 0 updated: int = 0 failed: int = 0 def _parse_date(value): if not value: return None if isinstance(value, date): return value return parse_date(value) def _upsert_nationality(payload: dict | None): if not payload: return None iso2_code = payload.get("iso2_code") if not iso2_code: return None nationality, _ = Nationality.objects.update_or_create( iso2_code=iso2_code, defaults={ "name": payload.get("name", iso2_code), "iso3_code": payload.get("iso3_code"), }, ) return nationality def _upsert_position(payload: dict | None): if not payload or not payload.get("code"): return None position, _ = Position.objects.update_or_create( code=payload["code"], defaults={"name": payload.get("name", payload["code"])}, ) return position def _upsert_role(payload: dict | None): if not payload or not payload.get("code"): return None role, _ = Role.objects.update_or_create( code=payload["code"], defaults={"name": payload.get("name", payload["code"])}, ) return role def _resolve_mapped_entity(model_cls, provider_namespace: str, external_id: str): ctype = ContentType.objects.get_for_model(model_cls) object_id = ( ctype.externalmapping_set.filter( provider_namespace=provider_namespace, external_id=external_id, is_active=True, ) .values_list("object_id", flat=True) .first() ) if object_id: return model_cls.objects.filter(pk=object_id).first() return None def _sync_competitions(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): for payload in payloads: summary.processed += 1 external_id = payload.get("external_id") if not external_id: summary.failed += 1 log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="competition", message="Missing external_id", raw_payload=payload, ) continue country = _upsert_nationality(payload.get("country")) defaults = { "name": payload.get("name", external_id), "slug": payload.get("slug", external_id), "competition_type": payload.get("competition_type", Competition.CompetitionType.LEAGUE), "gender": payload.get("gender", Competition.Gender.MEN), "level": payload.get("level", 1), "country": country, "is_active": payload.get("is_active", True), } competition = _resolve_mapped_entity(Competition, provider_namespace, external_id) if competition: for key, value in defaults.items(): setattr(competition, key, value) competition.save() summary.updated += 1 else: competition, created = Competition.objects.update_or_create( slug=defaults["slug"], defaults=defaults, ) if created: summary.created += 1 else: summary.updated += 1 upsert_external_mapping( provider_namespace=provider_namespace, external_id=external_id, instance=competition, raw_payload=payload, ) def _sync_teams(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): for payload in payloads: summary.processed += 1 external_id = payload.get("external_id") if not external_id: summary.failed += 1 log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="team", message="Missing external_id", raw_payload=payload, ) continue country = _upsert_nationality(payload.get("country")) defaults = { "name": payload.get("name", external_id), "short_name": payload.get("short_name", ""), "slug": payload.get("slug", external_id), "country": country, "is_national_team": payload.get("is_national_team", False), } team = _resolve_mapped_entity(Team, provider_namespace, external_id) if team: for key, value in defaults.items(): setattr(team, key, value) team.save() summary.updated += 1 else: team, created = Team.objects.update_or_create(slug=defaults["slug"], defaults=defaults) if created: summary.created += 1 else: summary.updated += 1 upsert_external_mapping( provider_namespace=provider_namespace, external_id=external_id, instance=team, raw_payload=payload, ) def _sync_seasons(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): for payload in payloads: summary.processed += 1 external_id = payload.get("external_id") if not external_id: summary.failed += 1 log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="season", message="Missing external_id", raw_payload=payload, ) continue start_date = _parse_date(payload.get("start_date")) end_date = _parse_date(payload.get("end_date")) if not start_date or not end_date: summary.failed += 1 log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="season", external_id=external_id, message="Invalid season dates", raw_payload=payload, ) continue defaults = { "start_date": start_date, "end_date": end_date, "is_current": payload.get("is_current", False), } season, created = Season.objects.update_or_create( label=payload.get("label", external_id), defaults=defaults, ) if created: summary.created += 1 else: summary.updated += 1 upsert_external_mapping( provider_namespace=provider_namespace, external_id=external_id, instance=season, raw_payload=payload, ) def _sync_players(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): for payload in payloads: summary.processed += 1 external_id = payload.get("external_id") if not external_id: summary.failed += 1 log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="player", message="Missing external_id", raw_payload=payload, ) continue nationality = _upsert_nationality(payload.get("nationality")) nominal_position = _upsert_position(payload.get("nominal_position")) inferred_role = _upsert_role(payload.get("inferred_role")) defaults = { "first_name": payload.get("first_name", ""), "last_name": payload.get("last_name", ""), "full_name": payload.get("full_name", external_id), "birth_date": _parse_date(payload.get("birth_date")), "nationality": nationality, "nominal_position": nominal_position, "inferred_role": inferred_role, "height_cm": payload.get("height_cm"), "weight_kg": payload.get("weight_kg"), "dominant_hand": payload.get("dominant_hand", Player.DominantHand.UNKNOWN), "is_active": payload.get("is_active", True), } player = _resolve_mapped_entity(Player, provider_namespace, external_id) if player: for key, value in defaults.items(): setattr(player, key, value) player.save() summary.updated += 1 else: player, created = Player.objects.update_or_create( full_name=defaults["full_name"], birth_date=defaults["birth_date"], defaults=defaults, ) if created: summary.created += 1 else: summary.updated += 1 PlayerAlias.objects.filter(player=player, source=provider_namespace).delete() for alias in payload.get("aliases", []): PlayerAlias.objects.get_or_create( player=player, alias=alias, defaults={"source": provider_namespace, "is_primary": False}, ) upsert_external_mapping( provider_namespace=provider_namespace, external_id=external_id, instance=player, raw_payload=payload, ) def _sync_player_stats(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): for payload in payloads: summary.processed += 1 external_id = payload.get("external_id", "") player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", "")) team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", "")) competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", "")) season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", "")) if not player or not season: summary.failed += 1 log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="player_stats", external_id=external_id, message="Player/season mapping missing", raw_payload=payload, ) continue player_season, created = PlayerSeason.objects.update_or_create( player=player, season=season, team=team, competition=competition, defaults={ "games_played": payload.get("games_played", 0), "games_started": payload.get("games_started", 0), "minutes_played": payload.get("minutes_played", 0), }, ) if created: summary.created += 1 else: summary.updated += 1 PlayerSeasonStats.objects.update_or_create( player_season=player_season, defaults={ "points": payload.get("points", 0), "rebounds": payload.get("rebounds", 0), "assists": payload.get("assists", 0), "steals": payload.get("steals", 0), "blocks": payload.get("blocks", 0), "turnovers": payload.get("turnovers", 0), "fg_pct": payload.get("fg_pct"), "three_pct": payload.get("three_pct"), "ft_pct": payload.get("ft_pct"), "usage_rate": payload.get("usage_rate"), "true_shooting_pct": payload.get("true_shooting_pct"), "player_efficiency_rating": payload.get("player_efficiency_rating"), }, ) def _sync_player_careers(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): touched_player_ids: set[int] = set() for payload in payloads: summary.processed += 1 external_id = payload.get("external_id", "") player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", "")) team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", "")) competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", "")) season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", "")) role = Role.objects.filter(code=payload.get("role_code", "")).first() if not player: summary.failed += 1 log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="player_career", external_id=external_id, message="Player mapping missing", raw_payload=payload, ) continue touched_player_ids.add(player.id) _, created = PlayerCareerEntry.objects.update_or_create( player=player, team=team, competition=competition, season=season, start_date=_parse_date(payload.get("start_date")), defaults={ "end_date": _parse_date(payload.get("end_date")), "shirt_number": payload.get("shirt_number"), "role_snapshot": role, "notes": payload.get("notes", ""), }, ) if created: summary.created += 1 else: summary.updated += 1 if touched_player_ids: for player in Player.objects.filter(id__in=touched_player_ids): refresh_player_origin(player) def run_sync_job( *, provider_namespace: str, job_type: str, triggered_by_id: int | None = None, context: dict | None = None, cursor: str | None = None, ): UserModel = get_user_model() triggered_by = None if triggered_by_id: triggered_by = UserModel.objects.filter(pk=triggered_by_id).first() run = start_ingestion_run( provider_namespace=provider_namespace, job_type=job_type, triggered_by=triggered_by, context=context or {}, ) summary = SyncSummary() logger.info( "Starting ingestion run id=%s provider=%s job_type=%s", run.id, provider_namespace, job_type, ) try: provider = get_provider(provider_namespace) payload = ( provider.sync_incremental(cursor=cursor) if job_type == IngestionRun.JobType.INCREMENTAL else provider.sync_all() ) source_counts = { "competitions": len(payload.get("competitions", [])), "teams": len(payload.get("teams", [])), "seasons": len(payload.get("seasons", [])), "players": len(payload.get("players", [])), "player_stats": len(payload.get("player_stats", [])), "player_careers": len(payload.get("player_careers", [])), } steps: list[tuple[str, callable, list[dict]]] = [ ("competitions", _sync_competitions, payload.get("competitions", [])), ("teams", _sync_teams, payload.get("teams", [])), ("seasons", _sync_seasons, payload.get("seasons", [])), ("players", _sync_players, payload.get("players", [])), ("player_stats", _sync_player_stats, payload.get("player_stats", [])), ("player_careers", _sync_player_careers, payload.get("player_careers", [])), ] for step_name, step_fn, step_payload in steps: step_summary = SyncSummary() with transaction.atomic(): step_fn(provider_namespace, step_payload, run, step_summary) summary.processed += step_summary.processed summary.created += step_summary.created summary.updated += step_summary.updated summary.failed += step_summary.failed update_ingestion_run_progress( run=run, completed_step=step_name, step_summary={ "processed": step_summary.processed, "created": step_summary.created, "updated": step_summary.updated, "failed": step_summary.failed, }, source_counts=source_counts, ) logger.info( "Completed ingestion step run_id=%s step=%s processed=%s created=%s updated=%s failed=%s", run.id, step_name, step_summary.processed, step_summary.created, step_summary.updated, step_summary.failed, ) success_error_summary = "" if summary.failed > 0: success_error_summary = f"Completed with {summary.failed} failed record(s)." finish_ingestion_run( run=run, status=IngestionRun.RunStatus.SUCCESS, processed=summary.processed, created=summary.created, updated=summary.updated, failed=summary.failed, error_summary=success_error_summary, ) logger.info( "Completed ingestion run id=%s status=%s processed=%s created=%s updated=%s failed=%s", run.id, IngestionRun.RunStatus.SUCCESS, summary.processed, summary.created, summary.updated, summary.failed, ) return run except ProviderRateLimitError as exc: logger.warning("Rate limit from provider %s: %s", provider_namespace, exc) log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="provider", severity="warning", message=f"Rate limit: {exc}", raw_payload={"retry_after_seconds": exc.retry_after_seconds}, ) finish_ingestion_run( run=run, status=IngestionRun.RunStatus.FAILED, processed=summary.processed, created=summary.created, updated=summary.updated, failed=summary.failed + 1, error_summary=f"Rate limit from provider: {exc}", ) raise except ProviderTransientError as exc: logger.exception("Transient provider error for namespace %s", provider_namespace) log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="provider", severity="error", message=str(exc), ) finish_ingestion_run( run=run, status=IngestionRun.RunStatus.FAILED, processed=summary.processed, created=summary.created, updated=summary.updated, failed=summary.failed + 1, error_summary=f"Transient provider error: {exc}", ) raise except Exception as exc: # noqa: BLE001 logger.exception("Unhandled ingestion error") log_ingestion_error( run=run, provider_namespace=provider_namespace, entity_type="sync", severity="critical", message=str(exc), ) finish_ingestion_run( run=run, status=IngestionRun.RunStatus.FAILED, processed=summary.processed, created=summary.created, updated=summary.updated, failed=summary.failed + 1, error_summary=f"Unhandled ingestion error: {exc}", ) raise