phase6: add provider abstraction, mvp adapter, and ingestion sync tasks

This commit is contained in:
Alfredo Di Stasio
2026-03-10 11:05:57 +01:00
parent f207ffbad8
commit ecd665e872
12 changed files with 1006 additions and 1 deletions

View File

@ -0,0 +1,9 @@
from .runs import finish_ingestion_run, log_ingestion_error, start_ingestion_run
from .sync import run_sync_job
__all__ = [
"start_ingestion_run",
"finish_ingestion_run",
"log_ingestion_error",
"run_sync_job",
]

View File

@ -0,0 +1,506 @@
import logging
from dataclasses import dataclass
from datetime import date
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth import get_user_model
from django.db import transaction
from django.utils.dateparse import parse_date
from apps.competitions.models import Competition, Season
from apps.ingestion.models import IngestionRun
from apps.ingestion.services.runs import finish_ingestion_run, log_ingestion_error, start_ingestion_run
from apps.players.models import Nationality, Player, PlayerAlias, PlayerCareerEntry, Position, Role
from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError
from apps.providers.registry import get_provider
from apps.providers.services.mappings import upsert_external_mapping
from apps.stats.models import PlayerSeason, PlayerSeasonStats
from apps.teams.models import Team
logger = logging.getLogger(__name__)
@dataclass
class SyncSummary:
processed: int = 0
created: int = 0
updated: int = 0
failed: int = 0
def _parse_date(value):
if not value:
return None
if isinstance(value, date):
return value
return parse_date(value)
def _upsert_nationality(payload: dict | None):
if not payload:
return None
iso2_code = payload.get("iso2_code")
if not iso2_code:
return None
nationality, _ = Nationality.objects.update_or_create(
iso2_code=iso2_code,
defaults={
"name": payload.get("name", iso2_code),
"iso3_code": payload.get("iso3_code"),
},
)
return nationality
def _upsert_position(payload: dict | None):
if not payload or not payload.get("code"):
return None
position, _ = Position.objects.update_or_create(
code=payload["code"],
defaults={"name": payload.get("name", payload["code"])},
)
return position
def _upsert_role(payload: dict | None):
if not payload or not payload.get("code"):
return None
role, _ = Role.objects.update_or_create(
code=payload["code"],
defaults={"name": payload.get("name", payload["code"])},
)
return role
def _resolve_mapped_entity(model_cls, provider_namespace: str, external_id: str):
ctype = ContentType.objects.get_for_model(model_cls)
object_id = (
ctype.externalmapping_set.filter(
provider_namespace=provider_namespace,
external_id=external_id,
is_active=True,
)
.values_list("object_id", flat=True)
.first()
)
if object_id:
return model_cls.objects.filter(pk=object_id).first()
return None
def _sync_competitions(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
for payload in payloads:
summary.processed += 1
external_id = payload.get("external_id")
if not external_id:
summary.failed += 1
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="competition",
message="Missing external_id",
raw_payload=payload,
)
continue
country = _upsert_nationality(payload.get("country"))
defaults = {
"name": payload.get("name", external_id),
"slug": payload.get("slug", external_id),
"competition_type": payload.get("competition_type", Competition.CompetitionType.LEAGUE),
"gender": payload.get("gender", Competition.Gender.MEN),
"level": payload.get("level", 1),
"country": country,
"is_active": payload.get("is_active", True),
}
competition = _resolve_mapped_entity(Competition, provider_namespace, external_id)
if competition:
for key, value in defaults.items():
setattr(competition, key, value)
competition.save()
summary.updated += 1
else:
competition, created = Competition.objects.update_or_create(
slug=defaults["slug"],
defaults=defaults,
)
if created:
summary.created += 1
else:
summary.updated += 1
upsert_external_mapping(
provider_namespace=provider_namespace,
external_id=external_id,
instance=competition,
raw_payload=payload,
)
def _sync_teams(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
for payload in payloads:
summary.processed += 1
external_id = payload.get("external_id")
if not external_id:
summary.failed += 1
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="team",
message="Missing external_id",
raw_payload=payload,
)
continue
country = _upsert_nationality(payload.get("country"))
defaults = {
"name": payload.get("name", external_id),
"short_name": payload.get("short_name", ""),
"slug": payload.get("slug", external_id),
"country": country,
"is_national_team": payload.get("is_national_team", False),
}
team = _resolve_mapped_entity(Team, provider_namespace, external_id)
if team:
for key, value in defaults.items():
setattr(team, key, value)
team.save()
summary.updated += 1
else:
team, created = Team.objects.update_or_create(slug=defaults["slug"], defaults=defaults)
if created:
summary.created += 1
else:
summary.updated += 1
upsert_external_mapping(
provider_namespace=provider_namespace,
external_id=external_id,
instance=team,
raw_payload=payload,
)
def _sync_seasons(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
for payload in payloads:
summary.processed += 1
external_id = payload.get("external_id")
if not external_id:
summary.failed += 1
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="season",
message="Missing external_id",
raw_payload=payload,
)
continue
start_date = _parse_date(payload.get("start_date"))
end_date = _parse_date(payload.get("end_date"))
if not start_date or not end_date:
summary.failed += 1
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="season",
external_id=external_id,
message="Invalid season dates",
raw_payload=payload,
)
continue
defaults = {
"start_date": start_date,
"end_date": end_date,
"is_current": payload.get("is_current", False),
}
season, created = Season.objects.update_or_create(
label=payload.get("label", external_id),
defaults=defaults,
)
if created:
summary.created += 1
else:
summary.updated += 1
upsert_external_mapping(
provider_namespace=provider_namespace,
external_id=external_id,
instance=season,
raw_payload=payload,
)
def _sync_players(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
for payload in payloads:
summary.processed += 1
external_id = payload.get("external_id")
if not external_id:
summary.failed += 1
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="player",
message="Missing external_id",
raw_payload=payload,
)
continue
nationality = _upsert_nationality(payload.get("nationality"))
nominal_position = _upsert_position(payload.get("nominal_position"))
inferred_role = _upsert_role(payload.get("inferred_role"))
defaults = {
"first_name": payload.get("first_name", ""),
"last_name": payload.get("last_name", ""),
"full_name": payload.get("full_name", external_id),
"birth_date": _parse_date(payload.get("birth_date")),
"nationality": nationality,
"nominal_position": nominal_position,
"inferred_role": inferred_role,
"height_cm": payload.get("height_cm"),
"weight_kg": payload.get("weight_kg"),
"dominant_hand": payload.get("dominant_hand", Player.DominantHand.UNKNOWN),
"is_active": payload.get("is_active", True),
}
player = _resolve_mapped_entity(Player, provider_namespace, external_id)
if player:
for key, value in defaults.items():
setattr(player, key, value)
player.save()
summary.updated += 1
else:
player, created = Player.objects.update_or_create(
full_name=defaults["full_name"],
birth_date=defaults["birth_date"],
defaults=defaults,
)
if created:
summary.created += 1
else:
summary.updated += 1
PlayerAlias.objects.filter(player=player, source=provider_namespace).delete()
for alias in payload.get("aliases", []):
PlayerAlias.objects.get_or_create(
player=player,
alias=alias,
defaults={"source": provider_namespace, "is_primary": False},
)
upsert_external_mapping(
provider_namespace=provider_namespace,
external_id=external_id,
instance=player,
raw_payload=payload,
)
def _sync_player_stats(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
for payload in payloads:
summary.processed += 1
external_id = payload.get("external_id", "")
player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", ""))
team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", ""))
competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", ""))
season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", ""))
if not player or not season:
summary.failed += 1
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="player_stats",
external_id=external_id,
message="Player/season mapping missing",
raw_payload=payload,
)
continue
player_season, created = PlayerSeason.objects.update_or_create(
player=player,
season=season,
team=team,
competition=competition,
defaults={
"games_played": payload.get("games_played", 0),
"games_started": payload.get("games_started", 0),
"minutes_played": payload.get("minutes_played", 0),
},
)
if created:
summary.created += 1
else:
summary.updated += 1
PlayerSeasonStats.objects.update_or_create(
player_season=player_season,
defaults={
"points": payload.get("points", 0),
"rebounds": payload.get("rebounds", 0),
"assists": payload.get("assists", 0),
"steals": payload.get("steals", 0),
"blocks": payload.get("blocks", 0),
"turnovers": payload.get("turnovers", 0),
"fg_pct": payload.get("fg_pct"),
"three_pct": payload.get("three_pct"),
"ft_pct": payload.get("ft_pct"),
"usage_rate": payload.get("usage_rate"),
"true_shooting_pct": payload.get("true_shooting_pct"),
"player_efficiency_rating": payload.get("player_efficiency_rating"),
},
)
def _sync_player_careers(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
for payload in payloads:
summary.processed += 1
external_id = payload.get("external_id", "")
player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", ""))
team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", ""))
competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", ""))
season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", ""))
role = Role.objects.filter(code=payload.get("role_code", "")).first()
if not player:
summary.failed += 1
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="player_career",
external_id=external_id,
message="Player mapping missing",
raw_payload=payload,
)
continue
_, created = PlayerCareerEntry.objects.update_or_create(
player=player,
team=team,
competition=competition,
season=season,
start_date=_parse_date(payload.get("start_date")),
defaults={
"end_date": _parse_date(payload.get("end_date")),
"shirt_number": payload.get("shirt_number"),
"role_snapshot": role,
"notes": payload.get("notes", ""),
},
)
if created:
summary.created += 1
else:
summary.updated += 1
def run_sync_job(
*,
provider_namespace: str,
job_type: str,
triggered_by_id: int | None = None,
context: dict | None = None,
cursor: str | None = None,
):
UserModel = get_user_model()
triggered_by = None
if triggered_by_id:
triggered_by = UserModel.objects.filter(pk=triggered_by_id).first()
run = start_ingestion_run(
provider_namespace=provider_namespace,
job_type=job_type,
triggered_by=triggered_by,
context=context or {},
)
summary = SyncSummary()
try:
provider = get_provider(provider_namespace)
payload = (
provider.sync_incremental(cursor=cursor)
if job_type == IngestionRun.JobType.INCREMENTAL
else provider.sync_all()
)
with transaction.atomic():
_sync_competitions(provider_namespace, payload.get("competitions", []), run, summary)
_sync_teams(provider_namespace, payload.get("teams", []), run, summary)
_sync_seasons(provider_namespace, payload.get("seasons", []), run, summary)
_sync_players(provider_namespace, payload.get("players", []), run, summary)
_sync_player_stats(provider_namespace, payload.get("player_stats", []), run, summary)
_sync_player_careers(provider_namespace, payload.get("player_careers", []), run, summary)
finish_ingestion_run(
run=run,
status=IngestionRun.RunStatus.SUCCESS,
processed=summary.processed,
created=summary.created,
updated=summary.updated,
failed=summary.failed,
)
return run
except ProviderRateLimitError as exc:
logger.warning("Rate limit from provider %s: %s", provider_namespace, exc)
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="provider",
severity="warning",
message=f"Rate limit: {exc}",
raw_payload={"retry_after_seconds": exc.retry_after_seconds},
)
finish_ingestion_run(
run=run,
status=IngestionRun.RunStatus.FAILED,
processed=summary.processed,
created=summary.created,
updated=summary.updated,
failed=summary.failed + 1,
)
raise
except ProviderTransientError as exc:
logger.exception("Transient provider error for namespace %s", provider_namespace)
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="provider",
severity="error",
message=str(exc),
)
finish_ingestion_run(
run=run,
status=IngestionRun.RunStatus.FAILED,
processed=summary.processed,
created=summary.created,
updated=summary.updated,
failed=summary.failed + 1,
)
raise
except Exception as exc: # noqa: BLE001
logger.exception("Unhandled ingestion error")
log_ingestion_error(
run=run,
provider_namespace=provider_namespace,
entity_type="sync",
severity="critical",
message=str(exc),
)
finish_ingestion_run(
run=run,
status=IngestionRun.RunStatus.FAILED,
processed=summary.processed,
created=summary.created,
updated=summary.updated,
failed=summary.failed + 1,
)
raise