feat(ingestion): add first public LBA Serie A importer
This commit is contained in:
359
app/scouting/importers/lba_public.py
Normal file
359
app/scouting/importers/lba_public.py
Normal file
@ -0,0 +1,359 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from urllib.error import URLError
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import urlopen
|
||||
|
||||
from django.db import transaction
|
||||
|
||||
from scouting.models import (
|
||||
Competition,
|
||||
ExternalEntityMapping,
|
||||
Player,
|
||||
PlayerSeason,
|
||||
PlayerSeasonStats,
|
||||
Season,
|
||||
Team,
|
||||
)
|
||||
|
||||
|
||||
class ImportValidationError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
LBA_SOURCE_NAME = "lba_public"
|
||||
LBA_COMPETITION_EXTERNAL_ID = "lba-serie-a"
|
||||
LBA_COMPETITION_NAME = "Lega Basket Serie A"
|
||||
LBA_COUNTRY = "IT"
|
||||
LBA_LEVEL = "top"
|
||||
LBA_STATS_ENDPOINT = "https://www.legabasket.it/api/statistics/get-players-statistics"
|
||||
LBA_STAT_CATEGORIES = ["points", "assists", "regain_balls", "lost_balls", "plus_minus", "rating_oer"]
|
||||
CATEGORY_TO_MODEL_FIELD = {
|
||||
"points": "points",
|
||||
"assists": "assists",
|
||||
"regain_balls": "steals",
|
||||
"lost_balls": "turnovers",
|
||||
"plus_minus": "plus_minus",
|
||||
"rating_oer": "offensive_rating",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImportSummary:
|
||||
players_created: int = 0
|
||||
players_updated: int = 0
|
||||
teams_created: int = 0
|
||||
teams_updated: int = 0
|
||||
contexts_created: int = 0
|
||||
contexts_updated: int = 0
|
||||
|
||||
|
||||
class LbaPublicStatsSource:
|
||||
def __init__(self, base_url: str = LBA_STATS_ENDPOINT, timeout_sec: int = 30):
|
||||
self.base_url = base_url
|
||||
self.timeout_sec = timeout_sec
|
||||
|
||||
def fetch_category(self, season_start_year: int, category: str) -> dict:
|
||||
query = urlencode({"s": season_start_year, "cat": category})
|
||||
url = f"{self.base_url}?{query}"
|
||||
try:
|
||||
with urlopen(url, timeout=self.timeout_sec) as response:
|
||||
payload = json.loads(response.read().decode("utf-8"))
|
||||
except URLError as exc:
|
||||
raise ImportValidationError(f"Could not fetch LBA source URL '{url}': {exc}") from exc
|
||||
except json.JSONDecodeError as exc:
|
||||
raise ImportValidationError(f"Invalid JSON received from '{url}': {exc}") from exc
|
||||
|
||||
if not isinstance(payload, dict) or "stats" not in payload:
|
||||
raise ImportValidationError(f"LBA source response from '{url}' is missing 'stats'.")
|
||||
return payload
|
||||
|
||||
|
||||
class LbaFixtureStatsSource:
|
||||
def __init__(self, fixture_path: Path):
|
||||
self.fixture_path = fixture_path
|
||||
try:
|
||||
payload = json.loads(fixture_path.read_text(encoding="utf-8"))
|
||||
except FileNotFoundError as exc:
|
||||
raise ImportValidationError(f"Fixture file not found: {fixture_path}") from exc
|
||||
except json.JSONDecodeError as exc:
|
||||
raise ImportValidationError(f"Invalid fixture JSON at '{fixture_path}': {exc}") from exc
|
||||
|
||||
categories = payload.get("categories")
|
||||
if not isinstance(categories, dict):
|
||||
raise ImportValidationError("Fixture payload must include a 'categories' object.")
|
||||
self.categories = categories
|
||||
|
||||
def fetch_category(self, season_start_year: int, category: str) -> dict:
|
||||
del season_start_year
|
||||
payload = self.categories.get(category)
|
||||
if payload is None:
|
||||
raise ImportValidationError(f"Fixture payload missing category '{category}'.")
|
||||
if not isinstance(payload, dict) or "stats" not in payload:
|
||||
raise ImportValidationError(f"Fixture category '{category}' is missing 'stats'.")
|
||||
return payload
|
||||
|
||||
|
||||
class LbaSerieAPublicImporter:
|
||||
def __init__(self, *, season_start_year: int, source: LbaPublicStatsSource | LbaFixtureStatsSource):
|
||||
self.season_start_year = season_start_year
|
||||
self.source = source
|
||||
self.summary = ImportSummary()
|
||||
|
||||
@transaction.atomic
|
||||
def run(self) -> ImportSummary:
|
||||
aggregated = self._collect_players()
|
||||
competition = self._upsert_competition()
|
||||
|
||||
for record in aggregated.values():
|
||||
season = self._upsert_season(record["year"])
|
||||
team = self._upsert_team(record)
|
||||
player = self._upsert_player(record)
|
||||
context = self._upsert_player_season(record, player, team, season, competition)
|
||||
self._upsert_stats(context, record)
|
||||
|
||||
return self.summary
|
||||
|
||||
def _collect_players(self) -> dict:
|
||||
players = {}
|
||||
|
||||
for category in LBA_STAT_CATEGORIES:
|
||||
payload = self.source.fetch_category(self.season_start_year, category)
|
||||
stats = payload.get("stats")
|
||||
if not isinstance(stats, list):
|
||||
raise ImportValidationError(f"Category '{category}' response must include a list in 'stats'.")
|
||||
|
||||
for row in stats:
|
||||
self._validate_stat_row(category, row)
|
||||
key = (row["player_id"], row["team_id"], row["year"])
|
||||
if key not in players:
|
||||
players[key] = {
|
||||
"player_id": row["player_id"],
|
||||
"team_id": row["team_id"],
|
||||
"year": row["year"],
|
||||
"name": row["name"],
|
||||
"surname": row["surname"],
|
||||
"team_name": row["team_name"],
|
||||
"scores": {},
|
||||
}
|
||||
players[key]["scores"][category] = self._to_decimal(row.get("score"))
|
||||
|
||||
if not players:
|
||||
raise ImportValidationError("No player statistics found from LBA source.")
|
||||
return players
|
||||
|
||||
def _validate_stat_row(self, category: str, row: dict) -> None:
|
||||
if not isinstance(row, dict):
|
||||
raise ImportValidationError(f"Category '{category}' contains a non-object stat row.")
|
||||
for field in ["player_id", "team_id", "year", "name", "surname", "team_name", "score"]:
|
||||
if row.get(field) in (None, ""):
|
||||
raise ImportValidationError(f"Category '{category}' row missing required field '{field}'.")
|
||||
|
||||
@staticmethod
|
||||
def _to_decimal(value) -> Decimal:
|
||||
return Decimal(str(value))
|
||||
|
||||
def _mapping_for(self, entity_type: str, external_id: str) -> ExternalEntityMapping | None:
|
||||
return ExternalEntityMapping.objects.filter(
|
||||
source_name=LBA_SOURCE_NAME,
|
||||
entity_type=entity_type,
|
||||
external_id=external_id,
|
||||
).first()
|
||||
|
||||
def _bind_mapping(self, *, entity_type: str, external_id: str, object_id: int) -> None:
|
||||
existing_for_external = ExternalEntityMapping.objects.filter(
|
||||
source_name=LBA_SOURCE_NAME,
|
||||
entity_type=entity_type,
|
||||
external_id=external_id,
|
||||
).first()
|
||||
if existing_for_external and existing_for_external.object_id != object_id:
|
||||
raise ImportValidationError(
|
||||
f"External ID '{external_id}' for {entity_type} is already linked to a different record."
|
||||
)
|
||||
|
||||
existing_for_object = ExternalEntityMapping.objects.filter(
|
||||
source_name=LBA_SOURCE_NAME,
|
||||
entity_type=entity_type,
|
||||
object_id=object_id,
|
||||
).first()
|
||||
if existing_for_object and existing_for_object.external_id != external_id:
|
||||
raise ImportValidationError(
|
||||
f"Conflicting mapping for {entity_type} object {object_id}: "
|
||||
f"'{existing_for_object.external_id}' vs '{external_id}'."
|
||||
)
|
||||
|
||||
ExternalEntityMapping.objects.get_or_create(
|
||||
source_name=LBA_SOURCE_NAME,
|
||||
entity_type=entity_type,
|
||||
external_id=external_id,
|
||||
defaults={"object_id": object_id},
|
||||
)
|
||||
|
||||
def _upsert_competition(self) -> Competition:
|
||||
mapping = self._mapping_for(ExternalEntityMapping.EntityType.COMPETITION, LBA_COMPETITION_EXTERNAL_ID)
|
||||
defaults = {"country": LBA_COUNTRY, "level": LBA_LEVEL}
|
||||
if mapping:
|
||||
competition = Competition.objects.filter(pk=mapping.object_id).first()
|
||||
if competition is None:
|
||||
raise ImportValidationError("Competition mapping points to a missing record.")
|
||||
Competition.objects.filter(pk=competition.pk).update(name=LBA_COMPETITION_NAME, **defaults)
|
||||
competition.refresh_from_db()
|
||||
else:
|
||||
competition, _ = Competition.objects.get_or_create(name=LBA_COMPETITION_NAME, defaults=defaults)
|
||||
|
||||
self._bind_mapping(
|
||||
entity_type=ExternalEntityMapping.EntityType.COMPETITION,
|
||||
external_id=LBA_COMPETITION_EXTERNAL_ID,
|
||||
object_id=competition.id,
|
||||
)
|
||||
return competition
|
||||
|
||||
def _upsert_season(self, year: int) -> Season:
|
||||
season_name = f"{year}-{year + 1}"
|
||||
season, created = Season.objects.get_or_create(
|
||||
name=season_name,
|
||||
defaults={"start_year": year, "end_year": year + 1},
|
||||
)
|
||||
if not created and (season.start_year != year or season.end_year != year + 1):
|
||||
raise ImportValidationError(
|
||||
f"Season '{season_name}' exists but does not match expected years {year}-{year + 1}."
|
||||
)
|
||||
return season
|
||||
|
||||
def _upsert_team(self, record: dict) -> Team:
|
||||
external_id = str(record["team_id"])
|
||||
mapping = self._mapping_for(ExternalEntityMapping.EntityType.TEAM, external_id)
|
||||
|
||||
if mapping:
|
||||
team = Team.objects.filter(pk=mapping.object_id).first()
|
||||
if team is None:
|
||||
raise ImportValidationError("Team mapping points to a missing record.")
|
||||
updates = []
|
||||
if team.name != record["team_name"]:
|
||||
team.name = record["team_name"]
|
||||
updates.append("name")
|
||||
if team.country != LBA_COUNTRY:
|
||||
team.country = LBA_COUNTRY
|
||||
updates.append("country")
|
||||
if updates:
|
||||
team.save(update_fields=updates + ["updated_at"])
|
||||
self.summary.teams_updated += 1
|
||||
else:
|
||||
team, created = Team.objects.get_or_create(
|
||||
name=record["team_name"],
|
||||
country=LBA_COUNTRY,
|
||||
defaults={},
|
||||
)
|
||||
if created:
|
||||
self.summary.teams_created += 1
|
||||
else:
|
||||
self.summary.teams_updated += 1
|
||||
|
||||
self._bind_mapping(
|
||||
entity_type=ExternalEntityMapping.EntityType.TEAM,
|
||||
external_id=external_id,
|
||||
object_id=team.id,
|
||||
)
|
||||
return team
|
||||
|
||||
def _upsert_player(self, record: dict) -> Player:
|
||||
external_id = str(record["player_id"])
|
||||
mapping = self._mapping_for(ExternalEntityMapping.EntityType.PLAYER, external_id)
|
||||
full_name = f"{record['name']} {record['surname']}".strip()
|
||||
|
||||
if mapping:
|
||||
player = Player.objects.filter(pk=mapping.object_id).first()
|
||||
if player is None:
|
||||
raise ImportValidationError("Player mapping points to a missing record.")
|
||||
position = player.position or Player.Position.SG
|
||||
player.full_name = full_name
|
||||
player.first_name = record["name"]
|
||||
player.last_name = record["surname"]
|
||||
player.position = position
|
||||
player.save()
|
||||
self.summary.players_updated += 1
|
||||
else:
|
||||
# LBA stats endpoint does not expose position directly. To satisfy the current
|
||||
# required model field without guessing role/taxonomy data, we use a neutral
|
||||
# default and keep role/specialty ownership untouched.
|
||||
player = Player.objects.create(
|
||||
full_name=full_name,
|
||||
first_name=record["name"],
|
||||
last_name=record["surname"],
|
||||
position=Player.Position.SG,
|
||||
)
|
||||
self.summary.players_created += 1
|
||||
|
||||
self._bind_mapping(
|
||||
entity_type=ExternalEntityMapping.EntityType.PLAYER,
|
||||
external_id=external_id,
|
||||
object_id=player.id,
|
||||
)
|
||||
return player
|
||||
|
||||
def _upsert_player_season(
|
||||
self,
|
||||
record: dict,
|
||||
player: Player,
|
||||
team: Team,
|
||||
season: Season,
|
||||
competition: Competition,
|
||||
) -> PlayerSeason:
|
||||
context_external_id = f"{record['year']}:{record['team_id']}:{record['player_id']}"
|
||||
mapping = self._mapping_for(ExternalEntityMapping.EntityType.PLAYER_SEASON, context_external_id)
|
||||
|
||||
if mapping:
|
||||
context = PlayerSeason.objects.filter(pk=mapping.object_id).first()
|
||||
if context is None:
|
||||
raise ImportValidationError("PlayerSeason mapping points to a missing record.")
|
||||
if (
|
||||
context.player_id != player.id
|
||||
or context.team_id != team.id
|
||||
or context.season_id != season.id
|
||||
or context.competition_id != competition.id
|
||||
):
|
||||
raise ImportValidationError("Mapped player-season context does not match incoming source identity.")
|
||||
self.summary.contexts_updated += 1
|
||||
else:
|
||||
context, created = PlayerSeason.objects.get_or_create(
|
||||
player=player,
|
||||
team=team,
|
||||
season=season,
|
||||
competition=competition,
|
||||
defaults={},
|
||||
)
|
||||
if created:
|
||||
self.summary.contexts_created += 1
|
||||
else:
|
||||
self.summary.contexts_updated += 1
|
||||
|
||||
self._bind_mapping(
|
||||
entity_type=ExternalEntityMapping.EntityType.PLAYER_SEASON,
|
||||
external_id=context_external_id,
|
||||
object_id=context.id,
|
||||
)
|
||||
return context
|
||||
|
||||
def _upsert_stats(self, context: PlayerSeason, record: dict) -> None:
|
||||
stats_defaults = {
|
||||
"points": None,
|
||||
"assists": None,
|
||||
"steals": None,
|
||||
"turnovers": None,
|
||||
"blocks": None,
|
||||
"efg_pct": None,
|
||||
"ts_pct": None,
|
||||
"plus_minus": None,
|
||||
"offensive_rating": None,
|
||||
"defensive_rating": None,
|
||||
}
|
||||
for category, value in record["scores"].items():
|
||||
model_field = CATEGORY_TO_MODEL_FIELD.get(category)
|
||||
if model_field:
|
||||
stats_defaults[model_field] = value
|
||||
|
||||
PlayerSeasonStats.objects.update_or_create(player_season=context, defaults=stats_defaults)
|
||||
Reference in New Issue
Block a user