From ecd665e87278b7d91c584e5be72420ecfc8fbfe9 Mon Sep 17 00:00:00 2001 From: Alfredo Di Stasio Date: Tue, 10 Mar 2026 11:05:57 +0100 Subject: [PATCH] phase6: add provider abstraction, mvp adapter, and ingestion sync tasks --- .env.example | 8 + apps/ingestion/admin.py | 63 ++- apps/ingestion/services/__init__.py | 9 + apps/ingestion/services/sync.py | 506 ++++++++++++++++++++++++ apps/ingestion/tasks.py | 38 ++ apps/providers/adapters/mvp_provider.py | 99 +++++ apps/providers/data/mvp_provider.json | 152 +++++++ apps/providers/exceptions.py | 18 + apps/providers/interfaces.py | 45 +++ apps/providers/registry.py | 17 + config/settings/base.py | 10 + tests/test_ingestion_sync.py | 42 ++ 12 files changed, 1006 insertions(+), 1 deletion(-) create mode 100644 apps/ingestion/services/sync.py create mode 100644 apps/ingestion/tasks.py create mode 100644 apps/providers/adapters/mvp_provider.py create mode 100644 apps/providers/data/mvp_provider.json create mode 100644 apps/providers/exceptions.py create mode 100644 apps/providers/interfaces.py create mode 100644 apps/providers/registry.py create mode 100644 tests/test_ingestion_sync.py diff --git a/.env.example b/.env.example index 0091c7a..7d26c8a 100644 --- a/.env.example +++ b/.env.example @@ -27,3 +27,11 @@ CELERY_RESULT_BACKEND=redis://redis:6379/0 AUTO_APPLY_MIGRATIONS=1 AUTO_COLLECTSTATIC=1 GUNICORN_WORKERS=3 + +# Providers / ingestion +PROVIDER_DEFAULT_NAMESPACE=mvp_demo +PROVIDER_MVP_DATA_FILE=/app/apps/providers/data/mvp_provider.json +PROVIDER_REQUEST_RETRIES=3 +PROVIDER_REQUEST_RETRY_SLEEP=1 +CELERY_TASK_TIME_LIMIT=1800 +CELERY_TASK_SOFT_TIME_LIMIT=1500 diff --git a/apps/ingestion/admin.py b/apps/ingestion/admin.py index 9edd789..4824e72 100644 --- a/apps/ingestion/admin.py +++ b/apps/ingestion/admin.py @@ -1,12 +1,14 @@ from django.contrib import admin +from django.contrib import messages from .models import IngestionError, IngestionRun +from .tasks import trigger_full_sync, trigger_incremental_sync class IngestionErrorInline(admin.TabularInline): model = IngestionError extra = 0 - readonly_fields = ("occurred_at",) + readonly_fields = ("provider_namespace", "entity_type", "external_id", "severity", "message", "occurred_at") @admin.register(IngestionRun) @@ -23,6 +25,55 @@ class IngestionRunAdmin(admin.ModelAdmin): list_filter = ("provider_namespace", "job_type", "status") search_fields = ("provider_namespace",) inlines = (IngestionErrorInline,) + readonly_fields = ( + "provider_namespace", + "job_type", + "status", + "triggered_by", + "started_at", + "finished_at", + "records_processed", + "records_created", + "records_updated", + "records_failed", + "context", + "raw_payload", + "created_at", + ) + actions = ( + "enqueue_full_sync_mvp", + "enqueue_incremental_sync_mvp", + "retry_selected_runs", + ) + + @admin.action(description="Queue full MVP sync") + def enqueue_full_sync_mvp(self, request, queryset): + trigger_full_sync.delay(provider_namespace="mvp_demo", triggered_by_id=request.user.id) + self.message_user(request, "Queued full MVP sync task.", level=messages.SUCCESS) + + @admin.action(description="Queue incremental MVP sync") + def enqueue_incremental_sync_mvp(self, request, queryset): + trigger_incremental_sync.delay(provider_namespace="mvp_demo", triggered_by_id=request.user.id) + self.message_user(request, "Queued incremental MVP sync task.", level=messages.SUCCESS) + + @admin.action(description="Retry selected ingestion runs") + def retry_selected_runs(self, request, queryset): + count = 0 + for run in queryset: + if run.job_type == IngestionRun.JobType.INCREMENTAL: + trigger_incremental_sync.delay( + provider_namespace=run.provider_namespace, + triggered_by_id=request.user.id, + context={"retry_of": run.id}, + ) + else: + trigger_full_sync.delay( + provider_namespace=run.provider_namespace, + triggered_by_id=request.user.id, + context={"retry_of": run.id}, + ) + count += 1 + self.message_user(request, f"Queued {count} retry task(s).", level=messages.SUCCESS) @admin.register(IngestionError) @@ -30,3 +81,13 @@ class IngestionErrorAdmin(admin.ModelAdmin): list_display = ("provider_namespace", "entity_type", "external_id", "severity", "occurred_at") list_filter = ("severity", "provider_namespace") search_fields = ("entity_type", "external_id", "message") + readonly_fields = ( + "ingestion_run", + "provider_namespace", + "entity_type", + "external_id", + "severity", + "message", + "raw_payload", + "occurred_at", + ) diff --git a/apps/ingestion/services/__init__.py b/apps/ingestion/services/__init__.py index e69de29..4327fb2 100644 --- a/apps/ingestion/services/__init__.py +++ b/apps/ingestion/services/__init__.py @@ -0,0 +1,9 @@ +from .runs import finish_ingestion_run, log_ingestion_error, start_ingestion_run +from .sync import run_sync_job + +__all__ = [ + "start_ingestion_run", + "finish_ingestion_run", + "log_ingestion_error", + "run_sync_job", +] diff --git a/apps/ingestion/services/sync.py b/apps/ingestion/services/sync.py new file mode 100644 index 0000000..33c6a8d --- /dev/null +++ b/apps/ingestion/services/sync.py @@ -0,0 +1,506 @@ +import logging +from dataclasses import dataclass +from datetime import date + +from django.contrib.contenttypes.models import ContentType +from django.contrib.auth import get_user_model +from django.db import transaction +from django.utils.dateparse import parse_date + +from apps.competitions.models import Competition, Season +from apps.ingestion.models import IngestionRun +from apps.ingestion.services.runs import finish_ingestion_run, log_ingestion_error, start_ingestion_run +from apps.players.models import Nationality, Player, PlayerAlias, PlayerCareerEntry, Position, Role +from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError +from apps.providers.registry import get_provider +from apps.providers.services.mappings import upsert_external_mapping +from apps.stats.models import PlayerSeason, PlayerSeasonStats +from apps.teams.models import Team + +logger = logging.getLogger(__name__) + + +@dataclass +class SyncSummary: + processed: int = 0 + created: int = 0 + updated: int = 0 + failed: int = 0 + + +def _parse_date(value): + if not value: + return None + if isinstance(value, date): + return value + return parse_date(value) + + +def _upsert_nationality(payload: dict | None): + if not payload: + return None + iso2_code = payload.get("iso2_code") + if not iso2_code: + return None + nationality, _ = Nationality.objects.update_or_create( + iso2_code=iso2_code, + defaults={ + "name": payload.get("name", iso2_code), + "iso3_code": payload.get("iso3_code"), + }, + ) + return nationality + + +def _upsert_position(payload: dict | None): + if not payload or not payload.get("code"): + return None + position, _ = Position.objects.update_or_create( + code=payload["code"], + defaults={"name": payload.get("name", payload["code"])}, + ) + return position + + +def _upsert_role(payload: dict | None): + if not payload or not payload.get("code"): + return None + role, _ = Role.objects.update_or_create( + code=payload["code"], + defaults={"name": payload.get("name", payload["code"])}, + ) + return role + + +def _resolve_mapped_entity(model_cls, provider_namespace: str, external_id: str): + ctype = ContentType.objects.get_for_model(model_cls) + object_id = ( + ctype.externalmapping_set.filter( + provider_namespace=provider_namespace, + external_id=external_id, + is_active=True, + ) + .values_list("object_id", flat=True) + .first() + ) + if object_id: + return model_cls.objects.filter(pk=object_id).first() + return None + + +def _sync_competitions(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): + for payload in payloads: + summary.processed += 1 + external_id = payload.get("external_id") + if not external_id: + summary.failed += 1 + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="competition", + message="Missing external_id", + raw_payload=payload, + ) + continue + + country = _upsert_nationality(payload.get("country")) + defaults = { + "name": payload.get("name", external_id), + "slug": payload.get("slug", external_id), + "competition_type": payload.get("competition_type", Competition.CompetitionType.LEAGUE), + "gender": payload.get("gender", Competition.Gender.MEN), + "level": payload.get("level", 1), + "country": country, + "is_active": payload.get("is_active", True), + } + + competition = _resolve_mapped_entity(Competition, provider_namespace, external_id) + if competition: + for key, value in defaults.items(): + setattr(competition, key, value) + competition.save() + summary.updated += 1 + else: + competition, created = Competition.objects.update_or_create( + slug=defaults["slug"], + defaults=defaults, + ) + if created: + summary.created += 1 + else: + summary.updated += 1 + + upsert_external_mapping( + provider_namespace=provider_namespace, + external_id=external_id, + instance=competition, + raw_payload=payload, + ) + + +def _sync_teams(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): + for payload in payloads: + summary.processed += 1 + external_id = payload.get("external_id") + if not external_id: + summary.failed += 1 + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="team", + message="Missing external_id", + raw_payload=payload, + ) + continue + + country = _upsert_nationality(payload.get("country")) + defaults = { + "name": payload.get("name", external_id), + "short_name": payload.get("short_name", ""), + "slug": payload.get("slug", external_id), + "country": country, + "is_national_team": payload.get("is_national_team", False), + } + + team = _resolve_mapped_entity(Team, provider_namespace, external_id) + if team: + for key, value in defaults.items(): + setattr(team, key, value) + team.save() + summary.updated += 1 + else: + team, created = Team.objects.update_or_create(slug=defaults["slug"], defaults=defaults) + if created: + summary.created += 1 + else: + summary.updated += 1 + + upsert_external_mapping( + provider_namespace=provider_namespace, + external_id=external_id, + instance=team, + raw_payload=payload, + ) + + +def _sync_seasons(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): + for payload in payloads: + summary.processed += 1 + external_id = payload.get("external_id") + if not external_id: + summary.failed += 1 + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="season", + message="Missing external_id", + raw_payload=payload, + ) + continue + + start_date = _parse_date(payload.get("start_date")) + end_date = _parse_date(payload.get("end_date")) + if not start_date or not end_date: + summary.failed += 1 + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="season", + external_id=external_id, + message="Invalid season dates", + raw_payload=payload, + ) + continue + + defaults = { + "start_date": start_date, + "end_date": end_date, + "is_current": payload.get("is_current", False), + } + season, created = Season.objects.update_or_create( + label=payload.get("label", external_id), + defaults=defaults, + ) + if created: + summary.created += 1 + else: + summary.updated += 1 + + upsert_external_mapping( + provider_namespace=provider_namespace, + external_id=external_id, + instance=season, + raw_payload=payload, + ) + + +def _sync_players(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): + for payload in payloads: + summary.processed += 1 + external_id = payload.get("external_id") + if not external_id: + summary.failed += 1 + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="player", + message="Missing external_id", + raw_payload=payload, + ) + continue + + nationality = _upsert_nationality(payload.get("nationality")) + nominal_position = _upsert_position(payload.get("nominal_position")) + inferred_role = _upsert_role(payload.get("inferred_role")) + + defaults = { + "first_name": payload.get("first_name", ""), + "last_name": payload.get("last_name", ""), + "full_name": payload.get("full_name", external_id), + "birth_date": _parse_date(payload.get("birth_date")), + "nationality": nationality, + "nominal_position": nominal_position, + "inferred_role": inferred_role, + "height_cm": payload.get("height_cm"), + "weight_kg": payload.get("weight_kg"), + "dominant_hand": payload.get("dominant_hand", Player.DominantHand.UNKNOWN), + "is_active": payload.get("is_active", True), + } + + player = _resolve_mapped_entity(Player, provider_namespace, external_id) + if player: + for key, value in defaults.items(): + setattr(player, key, value) + player.save() + summary.updated += 1 + else: + player, created = Player.objects.update_or_create( + full_name=defaults["full_name"], + birth_date=defaults["birth_date"], + defaults=defaults, + ) + if created: + summary.created += 1 + else: + summary.updated += 1 + + PlayerAlias.objects.filter(player=player, source=provider_namespace).delete() + for alias in payload.get("aliases", []): + PlayerAlias.objects.get_or_create( + player=player, + alias=alias, + defaults={"source": provider_namespace, "is_primary": False}, + ) + + upsert_external_mapping( + provider_namespace=provider_namespace, + external_id=external_id, + instance=player, + raw_payload=payload, + ) + + +def _sync_player_stats(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): + for payload in payloads: + summary.processed += 1 + external_id = payload.get("external_id", "") + + player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", "")) + team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", "")) + competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", "")) + season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", "")) + + if not player or not season: + summary.failed += 1 + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="player_stats", + external_id=external_id, + message="Player/season mapping missing", + raw_payload=payload, + ) + continue + + player_season, created = PlayerSeason.objects.update_or_create( + player=player, + season=season, + team=team, + competition=competition, + defaults={ + "games_played": payload.get("games_played", 0), + "games_started": payload.get("games_started", 0), + "minutes_played": payload.get("minutes_played", 0), + }, + ) + if created: + summary.created += 1 + else: + summary.updated += 1 + + PlayerSeasonStats.objects.update_or_create( + player_season=player_season, + defaults={ + "points": payload.get("points", 0), + "rebounds": payload.get("rebounds", 0), + "assists": payload.get("assists", 0), + "steals": payload.get("steals", 0), + "blocks": payload.get("blocks", 0), + "turnovers": payload.get("turnovers", 0), + "fg_pct": payload.get("fg_pct"), + "three_pct": payload.get("three_pct"), + "ft_pct": payload.get("ft_pct"), + "usage_rate": payload.get("usage_rate"), + "true_shooting_pct": payload.get("true_shooting_pct"), + "player_efficiency_rating": payload.get("player_efficiency_rating"), + }, + ) + + +def _sync_player_careers(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary): + for payload in payloads: + summary.processed += 1 + external_id = payload.get("external_id", "") + + player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", "")) + team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", "")) + competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", "")) + season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", "")) + role = Role.objects.filter(code=payload.get("role_code", "")).first() + + if not player: + summary.failed += 1 + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="player_career", + external_id=external_id, + message="Player mapping missing", + raw_payload=payload, + ) + continue + + _, created = PlayerCareerEntry.objects.update_or_create( + player=player, + team=team, + competition=competition, + season=season, + start_date=_parse_date(payload.get("start_date")), + defaults={ + "end_date": _parse_date(payload.get("end_date")), + "shirt_number": payload.get("shirt_number"), + "role_snapshot": role, + "notes": payload.get("notes", ""), + }, + ) + + if created: + summary.created += 1 + else: + summary.updated += 1 + + +def run_sync_job( + *, + provider_namespace: str, + job_type: str, + triggered_by_id: int | None = None, + context: dict | None = None, + cursor: str | None = None, +): + UserModel = get_user_model() + triggered_by = None + if triggered_by_id: + triggered_by = UserModel.objects.filter(pk=triggered_by_id).first() + + run = start_ingestion_run( + provider_namespace=provider_namespace, + job_type=job_type, + triggered_by=triggered_by, + context=context or {}, + ) + summary = SyncSummary() + + try: + provider = get_provider(provider_namespace) + payload = ( + provider.sync_incremental(cursor=cursor) + if job_type == IngestionRun.JobType.INCREMENTAL + else provider.sync_all() + ) + + with transaction.atomic(): + _sync_competitions(provider_namespace, payload.get("competitions", []), run, summary) + _sync_teams(provider_namespace, payload.get("teams", []), run, summary) + _sync_seasons(provider_namespace, payload.get("seasons", []), run, summary) + _sync_players(provider_namespace, payload.get("players", []), run, summary) + _sync_player_stats(provider_namespace, payload.get("player_stats", []), run, summary) + _sync_player_careers(provider_namespace, payload.get("player_careers", []), run, summary) + + finish_ingestion_run( + run=run, + status=IngestionRun.RunStatus.SUCCESS, + processed=summary.processed, + created=summary.created, + updated=summary.updated, + failed=summary.failed, + ) + return run + + except ProviderRateLimitError as exc: + logger.warning("Rate limit from provider %s: %s", provider_namespace, exc) + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="provider", + severity="warning", + message=f"Rate limit: {exc}", + raw_payload={"retry_after_seconds": exc.retry_after_seconds}, + ) + finish_ingestion_run( + run=run, + status=IngestionRun.RunStatus.FAILED, + processed=summary.processed, + created=summary.created, + updated=summary.updated, + failed=summary.failed + 1, + ) + raise + + except ProviderTransientError as exc: + logger.exception("Transient provider error for namespace %s", provider_namespace) + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="provider", + severity="error", + message=str(exc), + ) + finish_ingestion_run( + run=run, + status=IngestionRun.RunStatus.FAILED, + processed=summary.processed, + created=summary.created, + updated=summary.updated, + failed=summary.failed + 1, + ) + raise + + except Exception as exc: # noqa: BLE001 + logger.exception("Unhandled ingestion error") + log_ingestion_error( + run=run, + provider_namespace=provider_namespace, + entity_type="sync", + severity="critical", + message=str(exc), + ) + finish_ingestion_run( + run=run, + status=IngestionRun.RunStatus.FAILED, + processed=summary.processed, + created=summary.created, + updated=summary.updated, + failed=summary.failed + 1, + ) + raise diff --git a/apps/ingestion/tasks.py b/apps/ingestion/tasks.py new file mode 100644 index 0000000..90e12bf --- /dev/null +++ b/apps/ingestion/tasks.py @@ -0,0 +1,38 @@ +from celery import shared_task + +from apps.ingestion.models import IngestionRun +from apps.ingestion.services.sync import run_sync_job +from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError + + +@shared_task( + bind=True, + autoretry_for=(ProviderRateLimitError, ProviderTransientError), + retry_backoff=True, + retry_jitter=True, + retry_kwargs={"max_retries": 5}, +) +def trigger_full_sync(self, provider_namespace: str, triggered_by_id: int | None = None, context: dict | None = None): + return run_sync_job( + provider_namespace=provider_namespace, + job_type=IngestionRun.JobType.FULL_SYNC, + triggered_by_id=triggered_by_id, + context=context or {}, + ).id + + +@shared_task( + bind=True, + autoretry_for=(ProviderRateLimitError, ProviderTransientError), + retry_backoff=True, + retry_jitter=True, + retry_kwargs={"max_retries": 5}, +) +def trigger_incremental_sync(self, provider_namespace: str, cursor: str | None = None, triggered_by_id: int | None = None, context: dict | None = None): + return run_sync_job( + provider_namespace=provider_namespace, + job_type=IngestionRun.JobType.INCREMENTAL, + triggered_by_id=triggered_by_id, + context=context or {}, + cursor=cursor, + ).id diff --git a/apps/providers/adapters/mvp_provider.py b/apps/providers/adapters/mvp_provider.py new file mode 100644 index 0000000..a421e20 --- /dev/null +++ b/apps/providers/adapters/mvp_provider.py @@ -0,0 +1,99 @@ +import json +import logging +import os +import time +from pathlib import Path + +from django.conf import settings + +from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError +from apps.providers.interfaces import BaseProviderAdapter + +logger = logging.getLogger(__name__) + + +class MvpDemoProviderAdapter(BaseProviderAdapter): + """MVP provider backed by a local JSON payload for deterministic development syncs.""" + + namespace = "mvp_demo" + + def __init__(self): + default_path = Path(settings.BASE_DIR) / "apps" / "providers" / "data" / "mvp_provider.json" + self.data_file = Path(os.getenv("PROVIDER_MVP_DATA_FILE", str(default_path))) + self.max_retries = int(os.getenv("PROVIDER_REQUEST_RETRIES", "3")) + self.retry_sleep_seconds = float(os.getenv("PROVIDER_REQUEST_RETRY_SLEEP", "1")) + + def _load_payload(self) -> dict: + for attempt in range(1, self.max_retries + 1): + try: + if os.getenv("PROVIDER_MVP_FORCE_RATE_LIMIT", "0") == "1": + raise ProviderRateLimitError("Simulated provider rate limit", retry_after_seconds=15) + with self.data_file.open("r", encoding="utf-8") as handle: + return json.load(handle) + except ProviderRateLimitError: + raise + except FileNotFoundError as exc: + logger.exception("Provider data file not found: %s", self.data_file) + raise ProviderTransientError(str(exc)) from exc + except json.JSONDecodeError as exc: + logger.exception("Invalid provider payload JSON in %s", self.data_file) + raise ProviderTransientError(str(exc)) from exc + except OSError as exc: + if attempt >= self.max_retries: + logger.exception("Provider payload read failed after retries") + raise ProviderTransientError(str(exc)) from exc + time.sleep(self.retry_sleep_seconds * attempt) + raise ProviderTransientError("Unable to read provider payload") + + def _payload_list(self, key: str) -> list[dict]: + payload = self._load_payload() + value = payload.get(key, []) + return value if isinstance(value, list) else [] + + def search_players(self, *, query: str = "", limit: int = 50, offset: int = 0) -> list[dict]: + players = self.fetch_players() + if query: + query_lower = query.lower() + players = [p for p in players if query_lower in p.get("full_name", "").lower()] + return players[offset : offset + limit] + + def fetch_player(self, *, external_player_id: str) -> dict | None: + for payload in self.fetch_players(): + if payload.get("external_id") == external_player_id: + return payload + return None + + def fetch_players(self) -> list[dict]: + return self._payload_list("players") + + def fetch_competitions(self) -> list[dict]: + return self._payload_list("competitions") + + def fetch_teams(self) -> list[dict]: + return self._payload_list("teams") + + def fetch_seasons(self) -> list[dict]: + return self._payload_list("seasons") + + def fetch_player_stats(self) -> list[dict]: + return self._payload_list("player_stats") + + def fetch_player_careers(self) -> list[dict]: + return self._payload_list("player_careers") + + def sync_all(self) -> dict: + return { + "players": self.fetch_players(), + "competitions": self.fetch_competitions(), + "teams": self.fetch_teams(), + "seasons": self.fetch_seasons(), + "player_stats": self.fetch_player_stats(), + "player_careers": self.fetch_player_careers(), + "cursor": None, + } + + def sync_incremental(self, *, cursor: str | None = None) -> dict: + payload = self.sync_all() + # MVP source has no change feed yet; returns full snapshot. + payload["cursor"] = cursor + return payload diff --git a/apps/providers/data/mvp_provider.json b/apps/providers/data/mvp_provider.json new file mode 100644 index 0000000..f367941 --- /dev/null +++ b/apps/providers/data/mvp_provider.json @@ -0,0 +1,152 @@ +{ + "players": [ + { + "external_id": "player-001", + "first_name": "Luca", + "last_name": "Rinaldi", + "full_name": "Luca Rinaldi", + "birth_date": "2002-04-11", + "nationality": {"name": "Italy", "iso2_code": "IT", "iso3_code": "ITA"}, + "nominal_position": {"code": "PG", "name": "Point Guard"}, + "inferred_role": {"code": "playmaker", "name": "Playmaker"}, + "height_cm": 191, + "weight_kg": 83, + "dominant_hand": "right", + "is_active": true, + "aliases": ["L. Rinaldi"] + }, + { + "external_id": "player-002", + "first_name": "Mateo", + "last_name": "Silva", + "full_name": "Mateo Silva", + "birth_date": "2000-09-23", + "nationality": {"name": "Spain", "iso2_code": "ES", "iso3_code": "ESP"}, + "nominal_position": {"code": "SF", "name": "Small Forward"}, + "inferred_role": {"code": "wing_scorer", "name": "Wing Scorer"}, + "height_cm": 201, + "weight_kg": 94, + "dominant_hand": "left", + "is_active": true, + "aliases": ["M. Silva"] + } + ], + "competitions": [ + { + "external_id": "comp-001", + "name": "Euro League", + "slug": "euro-league", + "competition_type": "international", + "gender": "men", + "level": 1, + "country": {"name": "Europe", "iso2_code": "EU", "iso3_code": "EUR"}, + "is_active": true + } + ], + "teams": [ + { + "external_id": "team-001", + "name": "Roma Hoops", + "short_name": "ROM", + "slug": "roma-hoops", + "country": {"name": "Italy", "iso2_code": "IT", "iso3_code": "ITA"}, + "is_national_team": false + }, + { + "external_id": "team-002", + "name": "Madrid Flight", + "short_name": "MAD", + "slug": "madrid-flight", + "country": {"name": "Spain", "iso2_code": "ES", "iso3_code": "ESP"}, + "is_national_team": false + } + ], + "seasons": [ + { + "external_id": "season-2024-2025", + "label": "2024-2025", + "start_date": "2024-09-01", + "end_date": "2025-06-30", + "is_current": false + }, + { + "external_id": "season-2025-2026", + "label": "2025-2026", + "start_date": "2025-09-01", + "end_date": "2026-06-30", + "is_current": true + } + ], + "player_stats": [ + { + "external_id": "stats-001", + "player_external_id": "player-001", + "team_external_id": "team-001", + "competition_external_id": "comp-001", + "season_external_id": "season-2025-2026", + "games_played": 26, + "games_started": 22, + "minutes_played": 780, + "points": 15.6, + "rebounds": 4.1, + "assists": 7.4, + "steals": 1.7, + "blocks": 0.2, + "turnovers": 2.3, + "fg_pct": 46.5, + "three_pct": 38.0, + "ft_pct": 82.3, + "usage_rate": 24.8, + "true_shooting_pct": 57.4, + "player_efficiency_rating": 19.1 + }, + { + "external_id": "stats-002", + "player_external_id": "player-002", + "team_external_id": "team-002", + "competition_external_id": "comp-001", + "season_external_id": "season-2025-2026", + "games_played": 24, + "games_started": 24, + "minutes_played": 816, + "points": 18.2, + "rebounds": 6.6, + "assists": 2.9, + "steals": 1.1, + "blocks": 0.6, + "turnovers": 2.1, + "fg_pct": 49.2, + "three_pct": 36.1, + "ft_pct": 79.9, + "usage_rate": 27.3, + "true_shooting_pct": 59.0, + "player_efficiency_rating": 20.8 + } + ], + "player_careers": [ + { + "external_id": "career-001", + "player_external_id": "player-001", + "team_external_id": "team-001", + "competition_external_id": "comp-001", + "season_external_id": "season-2025-2026", + "role_code": "playmaker", + "start_date": "2025-09-01", + "end_date": null, + "shirt_number": 5, + "notes": "Primary creator" + }, + { + "external_id": "career-002", + "player_external_id": "player-002", + "team_external_id": "team-002", + "competition_external_id": "comp-001", + "season_external_id": "season-2025-2026", + "role_code": "wing_scorer", + "start_date": "2025-09-01", + "end_date": null, + "shirt_number": 11, + "notes": "First scoring option" + } + ] +} diff --git a/apps/providers/exceptions.py b/apps/providers/exceptions.py new file mode 100644 index 0000000..b76a5fd --- /dev/null +++ b/apps/providers/exceptions.py @@ -0,0 +1,18 @@ +class ProviderError(Exception): + """Base provider exception.""" + + +class ProviderTransientError(ProviderError): + """Temporary provider failure that can be retried.""" + + +class ProviderRateLimitError(ProviderTransientError): + """Raised when provider rate limit is hit.""" + + def __init__(self, message: str, retry_after_seconds: int = 30): + super().__init__(message) + self.retry_after_seconds = retry_after_seconds + + +class ProviderNotFoundError(ProviderError): + """Raised when an unknown provider namespace is requested.""" diff --git a/apps/providers/interfaces.py b/apps/providers/interfaces.py new file mode 100644 index 0000000..32144d3 --- /dev/null +++ b/apps/providers/interfaces.py @@ -0,0 +1,45 @@ +from abc import ABC, abstractmethod + + +class BaseProviderAdapter(ABC): + namespace: str + + @abstractmethod + def search_players(self, *, query: str = "", limit: int = 50, offset: int = 0) -> list[dict]: + raise NotImplementedError + + @abstractmethod + def fetch_player(self, *, external_player_id: str) -> dict | None: + raise NotImplementedError + + @abstractmethod + def fetch_players(self) -> list[dict]: + raise NotImplementedError + + @abstractmethod + def fetch_competitions(self) -> list[dict]: + raise NotImplementedError + + @abstractmethod + def fetch_teams(self) -> list[dict]: + raise NotImplementedError + + @abstractmethod + def fetch_seasons(self) -> list[dict]: + raise NotImplementedError + + @abstractmethod + def fetch_player_stats(self) -> list[dict]: + raise NotImplementedError + + @abstractmethod + def fetch_player_careers(self) -> list[dict]: + raise NotImplementedError + + @abstractmethod + def sync_all(self) -> dict: + raise NotImplementedError + + @abstractmethod + def sync_incremental(self, *, cursor: str | None = None) -> dict: + raise NotImplementedError diff --git a/apps/providers/registry.py b/apps/providers/registry.py new file mode 100644 index 0000000..c76744b --- /dev/null +++ b/apps/providers/registry.py @@ -0,0 +1,17 @@ +from django.conf import settings + +from apps.providers.adapters.mvp_provider import MvpDemoProviderAdapter +from apps.providers.exceptions import ProviderNotFoundError + + +PROVIDER_REGISTRY = { + MvpDemoProviderAdapter.namespace: MvpDemoProviderAdapter, +} + + +def get_provider(namespace: str | None = None): + provider_namespace = namespace or settings.PROVIDER_DEFAULT_NAMESPACE + provider_cls = PROVIDER_REGISTRY.get(provider_namespace) + if not provider_cls: + raise ProviderNotFoundError(f"Unknown provider namespace: {provider_namespace}") + return provider_cls() diff --git a/config/settings/base.py b/config/settings/base.py index a083949..0f098e8 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -113,3 +113,13 @@ CELERY_ACCEPT_CONTENT = ["json"] CELERY_TASK_SERIALIZER = "json" CELERY_RESULT_SERIALIZER = "json" CELERY_TIMEZONE = TIME_ZONE +CELERY_TASK_TIME_LIMIT = int(os.getenv("CELERY_TASK_TIME_LIMIT", "1800")) +CELERY_TASK_SOFT_TIME_LIMIT = int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "1500")) + +PROVIDER_DEFAULT_NAMESPACE = os.getenv("PROVIDER_DEFAULT_NAMESPACE", "mvp_demo") +PROVIDER_MVP_DATA_FILE = os.getenv( + "PROVIDER_MVP_DATA_FILE", + str(BASE_DIR / "apps" / "providers" / "data" / "mvp_provider.json"), +) +PROVIDER_REQUEST_RETRIES = int(os.getenv("PROVIDER_REQUEST_RETRIES", "3")) +PROVIDER_REQUEST_RETRY_SLEEP = float(os.getenv("PROVIDER_REQUEST_RETRY_SLEEP", "1")) diff --git a/tests/test_ingestion_sync.py b/tests/test_ingestion_sync.py new file mode 100644 index 0000000..3b4bf15 --- /dev/null +++ b/tests/test_ingestion_sync.py @@ -0,0 +1,42 @@ +import os + +import pytest + +from apps.competitions.models import Competition, Season +from apps.ingestion.models import IngestionError, IngestionRun +from apps.ingestion.services.sync import run_sync_job +from apps.players.models import Player +from apps.providers.exceptions import ProviderRateLimitError +from apps.stats.models import PlayerSeason, PlayerSeasonStats +from apps.teams.models import Team + + +@pytest.mark.django_db +def test_run_full_sync_creates_domain_objects(settings): + settings.PROVIDER_DEFAULT_NAMESPACE = "mvp_demo" + + run = run_sync_job(provider_namespace="mvp_demo", job_type=IngestionRun.JobType.FULL_SYNC) + + assert run.status == IngestionRun.RunStatus.SUCCESS + assert Competition.objects.count() >= 1 + assert Team.objects.count() >= 1 + assert Season.objects.count() >= 1 + assert Player.objects.count() >= 1 + assert PlayerSeason.objects.count() >= 1 + assert PlayerSeasonStats.objects.count() >= 1 + + +@pytest.mark.django_db +def test_run_sync_handles_rate_limit(settings): + settings.PROVIDER_DEFAULT_NAMESPACE = "mvp_demo" + os.environ["PROVIDER_MVP_FORCE_RATE_LIMIT"] = "1" + + with pytest.raises(ProviderRateLimitError): + run_sync_job(provider_namespace="mvp_demo", job_type=IngestionRun.JobType.FULL_SYNC) + + run = IngestionRun.objects.order_by("-id").first() + assert run is not None + assert run.status == IngestionRun.RunStatus.FAILED + assert IngestionError.objects.filter(ingestion_run=run).exists() + + os.environ.pop("PROVIDER_MVP_FORCE_RATE_LIMIT", None)