581 lines
20 KiB
Python
581 lines
20 KiB
Python
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.auth import get_user_model
|
|
from django.db import transaction
|
|
from django.utils.dateparse import parse_date
|
|
|
|
from apps.competitions.models import Competition, Season
|
|
from apps.ingestion.models import IngestionRun
|
|
from apps.ingestion.services.runs import (
|
|
finish_ingestion_run,
|
|
log_ingestion_error,
|
|
start_ingestion_run,
|
|
update_ingestion_run_progress,
|
|
)
|
|
from apps.players.models import Nationality, Player, PlayerAlias, PlayerCareerEntry, Position, Role
|
|
from apps.players.services.origin import refresh_player_origin
|
|
from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError
|
|
from apps.providers.registry import get_provider
|
|
from apps.providers.services.mappings import upsert_external_mapping
|
|
from apps.stats.models import PlayerSeason, PlayerSeasonStats
|
|
from apps.teams.models import Team
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SyncSummary:
|
|
processed: int = 0
|
|
created: int = 0
|
|
updated: int = 0
|
|
failed: int = 0
|
|
|
|
|
|
def _parse_date(value):
|
|
if not value:
|
|
return None
|
|
if isinstance(value, date):
|
|
return value
|
|
return parse_date(value)
|
|
|
|
|
|
def _upsert_nationality(payload: dict | None):
|
|
if not payload:
|
|
return None
|
|
iso2_code = payload.get("iso2_code")
|
|
if not iso2_code:
|
|
return None
|
|
nationality, _ = Nationality.objects.update_or_create(
|
|
iso2_code=iso2_code,
|
|
defaults={
|
|
"name": payload.get("name", iso2_code),
|
|
"iso3_code": payload.get("iso3_code"),
|
|
},
|
|
)
|
|
return nationality
|
|
|
|
|
|
def _upsert_position(payload: dict | None):
|
|
if not payload or not payload.get("code"):
|
|
return None
|
|
position, _ = Position.objects.update_or_create(
|
|
code=payload["code"],
|
|
defaults={"name": payload.get("name", payload["code"])},
|
|
)
|
|
return position
|
|
|
|
|
|
def _upsert_role(payload: dict | None):
|
|
if not payload or not payload.get("code"):
|
|
return None
|
|
role, _ = Role.objects.update_or_create(
|
|
code=payload["code"],
|
|
defaults={"name": payload.get("name", payload["code"])},
|
|
)
|
|
return role
|
|
|
|
|
|
def _resolve_mapped_entity(model_cls, provider_namespace: str, external_id: str):
|
|
ctype = ContentType.objects.get_for_model(model_cls)
|
|
object_id = (
|
|
ctype.externalmapping_set.filter(
|
|
provider_namespace=provider_namespace,
|
|
external_id=external_id,
|
|
is_active=True,
|
|
)
|
|
.values_list("object_id", flat=True)
|
|
.first()
|
|
)
|
|
if object_id:
|
|
return model_cls.objects.filter(pk=object_id).first()
|
|
return None
|
|
|
|
|
|
def _sync_competitions(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
|
|
for payload in payloads:
|
|
summary.processed += 1
|
|
external_id = payload.get("external_id")
|
|
if not external_id:
|
|
summary.failed += 1
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="competition",
|
|
message="Missing external_id",
|
|
raw_payload=payload,
|
|
)
|
|
continue
|
|
|
|
country = _upsert_nationality(payload.get("country"))
|
|
defaults = {
|
|
"name": payload.get("name", external_id),
|
|
"slug": payload.get("slug", external_id),
|
|
"competition_type": payload.get("competition_type", Competition.CompetitionType.LEAGUE),
|
|
"gender": payload.get("gender", Competition.Gender.MEN),
|
|
"level": payload.get("level", 1),
|
|
"country": country,
|
|
"is_active": payload.get("is_active", True),
|
|
}
|
|
|
|
competition = _resolve_mapped_entity(Competition, provider_namespace, external_id)
|
|
if competition:
|
|
for key, value in defaults.items():
|
|
setattr(competition, key, value)
|
|
competition.save()
|
|
summary.updated += 1
|
|
else:
|
|
competition, created = Competition.objects.update_or_create(
|
|
slug=defaults["slug"],
|
|
defaults=defaults,
|
|
)
|
|
if created:
|
|
summary.created += 1
|
|
else:
|
|
summary.updated += 1
|
|
|
|
upsert_external_mapping(
|
|
provider_namespace=provider_namespace,
|
|
external_id=external_id,
|
|
instance=competition,
|
|
raw_payload=payload,
|
|
)
|
|
|
|
|
|
def _sync_teams(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
|
|
for payload in payloads:
|
|
summary.processed += 1
|
|
external_id = payload.get("external_id")
|
|
if not external_id:
|
|
summary.failed += 1
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="team",
|
|
message="Missing external_id",
|
|
raw_payload=payload,
|
|
)
|
|
continue
|
|
|
|
country = _upsert_nationality(payload.get("country"))
|
|
defaults = {
|
|
"name": payload.get("name", external_id),
|
|
"short_name": payload.get("short_name", ""),
|
|
"slug": payload.get("slug", external_id),
|
|
"country": country,
|
|
"is_national_team": payload.get("is_national_team", False),
|
|
}
|
|
|
|
team = _resolve_mapped_entity(Team, provider_namespace, external_id)
|
|
if team:
|
|
for key, value in defaults.items():
|
|
setattr(team, key, value)
|
|
team.save()
|
|
summary.updated += 1
|
|
else:
|
|
team, created = Team.objects.update_or_create(slug=defaults["slug"], defaults=defaults)
|
|
if created:
|
|
summary.created += 1
|
|
else:
|
|
summary.updated += 1
|
|
|
|
upsert_external_mapping(
|
|
provider_namespace=provider_namespace,
|
|
external_id=external_id,
|
|
instance=team,
|
|
raw_payload=payload,
|
|
)
|
|
|
|
|
|
def _sync_seasons(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
|
|
for payload in payloads:
|
|
summary.processed += 1
|
|
external_id = payload.get("external_id")
|
|
if not external_id:
|
|
summary.failed += 1
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="season",
|
|
message="Missing external_id",
|
|
raw_payload=payload,
|
|
)
|
|
continue
|
|
|
|
start_date = _parse_date(payload.get("start_date"))
|
|
end_date = _parse_date(payload.get("end_date"))
|
|
if not start_date or not end_date:
|
|
summary.failed += 1
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="season",
|
|
external_id=external_id,
|
|
message="Invalid season dates",
|
|
raw_payload=payload,
|
|
)
|
|
continue
|
|
|
|
defaults = {
|
|
"start_date": start_date,
|
|
"end_date": end_date,
|
|
"is_current": payload.get("is_current", False),
|
|
}
|
|
season, created = Season.objects.update_or_create(
|
|
label=payload.get("label", external_id),
|
|
defaults=defaults,
|
|
)
|
|
if created:
|
|
summary.created += 1
|
|
else:
|
|
summary.updated += 1
|
|
|
|
upsert_external_mapping(
|
|
provider_namespace=provider_namespace,
|
|
external_id=external_id,
|
|
instance=season,
|
|
raw_payload=payload,
|
|
)
|
|
|
|
|
|
def _sync_players(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
|
|
for payload in payloads:
|
|
summary.processed += 1
|
|
external_id = payload.get("external_id")
|
|
if not external_id:
|
|
summary.failed += 1
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="player",
|
|
message="Missing external_id",
|
|
raw_payload=payload,
|
|
)
|
|
continue
|
|
|
|
nationality = _upsert_nationality(payload.get("nationality"))
|
|
nominal_position = _upsert_position(payload.get("nominal_position"))
|
|
inferred_role = _upsert_role(payload.get("inferred_role"))
|
|
|
|
defaults = {
|
|
"first_name": payload.get("first_name", ""),
|
|
"last_name": payload.get("last_name", ""),
|
|
"full_name": payload.get("full_name", external_id),
|
|
"birth_date": _parse_date(payload.get("birth_date")),
|
|
"nationality": nationality,
|
|
"nominal_position": nominal_position,
|
|
"inferred_role": inferred_role,
|
|
"height_cm": payload.get("height_cm"),
|
|
"weight_kg": payload.get("weight_kg"),
|
|
"dominant_hand": payload.get("dominant_hand", Player.DominantHand.UNKNOWN),
|
|
"is_active": payload.get("is_active", True),
|
|
}
|
|
|
|
player = _resolve_mapped_entity(Player, provider_namespace, external_id)
|
|
if player:
|
|
for key, value in defaults.items():
|
|
setattr(player, key, value)
|
|
player.save()
|
|
summary.updated += 1
|
|
else:
|
|
player, created = Player.objects.update_or_create(
|
|
full_name=defaults["full_name"],
|
|
birth_date=defaults["birth_date"],
|
|
defaults=defaults,
|
|
)
|
|
if created:
|
|
summary.created += 1
|
|
else:
|
|
summary.updated += 1
|
|
|
|
PlayerAlias.objects.filter(player=player, source=provider_namespace).delete()
|
|
for alias in payload.get("aliases", []):
|
|
PlayerAlias.objects.get_or_create(
|
|
player=player,
|
|
alias=alias,
|
|
defaults={"source": provider_namespace, "is_primary": False},
|
|
)
|
|
|
|
upsert_external_mapping(
|
|
provider_namespace=provider_namespace,
|
|
external_id=external_id,
|
|
instance=player,
|
|
raw_payload=payload,
|
|
)
|
|
|
|
|
|
def _sync_player_stats(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
|
|
for payload in payloads:
|
|
summary.processed += 1
|
|
external_id = payload.get("external_id", "")
|
|
|
|
player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", ""))
|
|
team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", ""))
|
|
competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", ""))
|
|
season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", ""))
|
|
|
|
if not player or not season:
|
|
summary.failed += 1
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="player_stats",
|
|
external_id=external_id,
|
|
message="Player/season mapping missing",
|
|
raw_payload=payload,
|
|
)
|
|
continue
|
|
|
|
player_season, created = PlayerSeason.objects.update_or_create(
|
|
player=player,
|
|
season=season,
|
|
team=team,
|
|
competition=competition,
|
|
defaults={
|
|
"games_played": payload.get("games_played", 0),
|
|
"games_started": payload.get("games_started", 0),
|
|
"minutes_played": payload.get("minutes_played", 0),
|
|
},
|
|
)
|
|
if created:
|
|
summary.created += 1
|
|
else:
|
|
summary.updated += 1
|
|
|
|
PlayerSeasonStats.objects.update_or_create(
|
|
player_season=player_season,
|
|
defaults={
|
|
"points": payload.get("points", 0),
|
|
"rebounds": payload.get("rebounds", 0),
|
|
"assists": payload.get("assists", 0),
|
|
"steals": payload.get("steals", 0),
|
|
"blocks": payload.get("blocks", 0),
|
|
"turnovers": payload.get("turnovers", 0),
|
|
"fg_pct": payload.get("fg_pct"),
|
|
"three_pct": payload.get("three_pct"),
|
|
"ft_pct": payload.get("ft_pct"),
|
|
"usage_rate": payload.get("usage_rate"),
|
|
"true_shooting_pct": payload.get("true_shooting_pct"),
|
|
"player_efficiency_rating": payload.get("player_efficiency_rating"),
|
|
},
|
|
)
|
|
|
|
|
|
def _sync_player_careers(provider_namespace: str, payloads: list[dict], run: IngestionRun, summary: SyncSummary):
|
|
touched_player_ids: set[int] = set()
|
|
for payload in payloads:
|
|
summary.processed += 1
|
|
external_id = payload.get("external_id", "")
|
|
|
|
player = _resolve_mapped_entity(Player, provider_namespace, payload.get("player_external_id", ""))
|
|
team = _resolve_mapped_entity(Team, provider_namespace, payload.get("team_external_id", ""))
|
|
competition = _resolve_mapped_entity(Competition, provider_namespace, payload.get("competition_external_id", ""))
|
|
season = _resolve_mapped_entity(Season, provider_namespace, payload.get("season_external_id", ""))
|
|
role = Role.objects.filter(code=payload.get("role_code", "")).first()
|
|
|
|
if not player:
|
|
summary.failed += 1
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="player_career",
|
|
external_id=external_id,
|
|
message="Player mapping missing",
|
|
raw_payload=payload,
|
|
)
|
|
continue
|
|
|
|
touched_player_ids.add(player.id)
|
|
_, created = PlayerCareerEntry.objects.update_or_create(
|
|
player=player,
|
|
team=team,
|
|
competition=competition,
|
|
season=season,
|
|
start_date=_parse_date(payload.get("start_date")),
|
|
defaults={
|
|
"end_date": _parse_date(payload.get("end_date")),
|
|
"shirt_number": payload.get("shirt_number"),
|
|
"role_snapshot": role,
|
|
"notes": payload.get("notes", ""),
|
|
},
|
|
)
|
|
|
|
if created:
|
|
summary.created += 1
|
|
else:
|
|
summary.updated += 1
|
|
|
|
if touched_player_ids:
|
|
for player in Player.objects.filter(id__in=touched_player_ids):
|
|
refresh_player_origin(player)
|
|
|
|
|
|
def run_sync_job(
|
|
*,
|
|
provider_namespace: str,
|
|
job_type: str,
|
|
triggered_by_id: int | None = None,
|
|
context: dict | None = None,
|
|
cursor: str | None = None,
|
|
):
|
|
UserModel = get_user_model()
|
|
triggered_by = None
|
|
if triggered_by_id:
|
|
triggered_by = UserModel.objects.filter(pk=triggered_by_id).first()
|
|
|
|
run = start_ingestion_run(
|
|
provider_namespace=provider_namespace,
|
|
job_type=job_type,
|
|
triggered_by=triggered_by,
|
|
context=context or {},
|
|
)
|
|
summary = SyncSummary()
|
|
logger.info(
|
|
"Starting ingestion run id=%s provider=%s job_type=%s",
|
|
run.id,
|
|
provider_namespace,
|
|
job_type,
|
|
)
|
|
|
|
try:
|
|
provider = get_provider(provider_namespace)
|
|
payload = (
|
|
provider.sync_incremental(cursor=cursor)
|
|
if job_type == IngestionRun.JobType.INCREMENTAL
|
|
else provider.sync_all()
|
|
)
|
|
source_counts = {
|
|
"competitions": len(payload.get("competitions", [])),
|
|
"teams": len(payload.get("teams", [])),
|
|
"seasons": len(payload.get("seasons", [])),
|
|
"players": len(payload.get("players", [])),
|
|
"player_stats": len(payload.get("player_stats", [])),
|
|
"player_careers": len(payload.get("player_careers", [])),
|
|
}
|
|
|
|
steps: list[tuple[str, callable, list[dict]]] = [
|
|
("competitions", _sync_competitions, payload.get("competitions", [])),
|
|
("teams", _sync_teams, payload.get("teams", [])),
|
|
("seasons", _sync_seasons, payload.get("seasons", [])),
|
|
("players", _sync_players, payload.get("players", [])),
|
|
("player_stats", _sync_player_stats, payload.get("player_stats", [])),
|
|
("player_careers", _sync_player_careers, payload.get("player_careers", [])),
|
|
]
|
|
|
|
for step_name, step_fn, step_payload in steps:
|
|
step_summary = SyncSummary()
|
|
with transaction.atomic():
|
|
step_fn(provider_namespace, step_payload, run, step_summary)
|
|
|
|
summary.processed += step_summary.processed
|
|
summary.created += step_summary.created
|
|
summary.updated += step_summary.updated
|
|
summary.failed += step_summary.failed
|
|
|
|
update_ingestion_run_progress(
|
|
run=run,
|
|
completed_step=step_name,
|
|
step_summary={
|
|
"processed": step_summary.processed,
|
|
"created": step_summary.created,
|
|
"updated": step_summary.updated,
|
|
"failed": step_summary.failed,
|
|
},
|
|
source_counts=source_counts,
|
|
)
|
|
logger.info(
|
|
"Completed ingestion step run_id=%s step=%s processed=%s created=%s updated=%s failed=%s",
|
|
run.id,
|
|
step_name,
|
|
step_summary.processed,
|
|
step_summary.created,
|
|
step_summary.updated,
|
|
step_summary.failed,
|
|
)
|
|
|
|
success_error_summary = ""
|
|
if summary.failed > 0:
|
|
success_error_summary = f"Completed with {summary.failed} failed record(s)."
|
|
finish_ingestion_run(
|
|
run=run,
|
|
status=IngestionRun.RunStatus.SUCCESS,
|
|
processed=summary.processed,
|
|
created=summary.created,
|
|
updated=summary.updated,
|
|
failed=summary.failed,
|
|
error_summary=success_error_summary,
|
|
)
|
|
logger.info(
|
|
"Completed ingestion run id=%s status=%s processed=%s created=%s updated=%s failed=%s",
|
|
run.id,
|
|
IngestionRun.RunStatus.SUCCESS,
|
|
summary.processed,
|
|
summary.created,
|
|
summary.updated,
|
|
summary.failed,
|
|
)
|
|
return run
|
|
|
|
except ProviderRateLimitError as exc:
|
|
logger.warning("Rate limit from provider %s: %s", provider_namespace, exc)
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="provider",
|
|
severity="warning",
|
|
message=f"Rate limit: {exc}",
|
|
raw_payload={"retry_after_seconds": exc.retry_after_seconds},
|
|
)
|
|
finish_ingestion_run(
|
|
run=run,
|
|
status=IngestionRun.RunStatus.FAILED,
|
|
processed=summary.processed,
|
|
created=summary.created,
|
|
updated=summary.updated,
|
|
failed=summary.failed + 1,
|
|
error_summary=f"Rate limit from provider: {exc}",
|
|
)
|
|
raise
|
|
|
|
except ProviderTransientError as exc:
|
|
logger.exception("Transient provider error for namespace %s", provider_namespace)
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="provider",
|
|
severity="error",
|
|
message=str(exc),
|
|
)
|
|
finish_ingestion_run(
|
|
run=run,
|
|
status=IngestionRun.RunStatus.FAILED,
|
|
processed=summary.processed,
|
|
created=summary.created,
|
|
updated=summary.updated,
|
|
failed=summary.failed + 1,
|
|
error_summary=f"Transient provider error: {exc}",
|
|
)
|
|
raise
|
|
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.exception("Unhandled ingestion error")
|
|
log_ingestion_error(
|
|
run=run,
|
|
provider_namespace=provider_namespace,
|
|
entity_type="sync",
|
|
severity="critical",
|
|
message=str(exc),
|
|
)
|
|
finish_ingestion_run(
|
|
run=run,
|
|
status=IngestionRun.RunStatus.FAILED,
|
|
processed=summary.processed,
|
|
created=summary.created,
|
|
updated=summary.updated,
|
|
failed=summary.failed + 1,
|
|
error_summary=f"Unhandled ingestion error: {exc}",
|
|
)
|
|
raise
|