fix(v2-ingestion): harden LBA/BCL snapshot contract for public data
This commit is contained in:
@ -103,6 +103,86 @@ def test_valid_snapshot_import(tmp_path, settings):
|
||||
assert PlayerSeasonStats.objects.count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_snapshot_import_succeeds_with_optional_bio_and_physical_fields_missing(tmp_path, settings):
|
||||
incoming = tmp_path / "incoming"
|
||||
archive = tmp_path / "archive"
|
||||
failed = tmp_path / "failed"
|
||||
incoming.mkdir()
|
||||
archive.mkdir()
|
||||
failed.mkdir()
|
||||
|
||||
payload = _valid_payload()
|
||||
for optional_field in ("first_name", "last_name", "birth_date", "nationality", "height_cm", "weight_kg", "position", "role"):
|
||||
payload["records"][0].pop(optional_field, None)
|
||||
|
||||
file_path = incoming / "optional-missing.json"
|
||||
_write_json(file_path, payload)
|
||||
|
||||
settings.STATIC_DATASET_INCOMING_DIR = str(incoming)
|
||||
settings.STATIC_DATASET_ARCHIVE_DIR = str(archive)
|
||||
settings.STATIC_DATASET_FAILED_DIR = str(failed)
|
||||
|
||||
call_command("import_snapshots")
|
||||
|
||||
run = ImportRun.objects.get()
|
||||
assert run.status == ImportRun.RunStatus.SUCCESS
|
||||
player = Player.objects.get(source_uid="player-23")
|
||||
assert player.first_name == "LeBron"
|
||||
assert player.last_name == "James"
|
||||
assert player.birth_date is None
|
||||
assert player.nationality is None
|
||||
assert player.nominal_position is None
|
||||
assert player.height_cm is None
|
||||
assert player.weight_kg is None
|
||||
assert PlayerSeasonStats.objects.count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize(
|
||||
("source_name", "competition_id", "competition_name"),
|
||||
[
|
||||
("lba", "lba-serie-a", "Lega Basket Serie A"),
|
||||
("bcl", "bcl", "Basketball Champions League"),
|
||||
],
|
||||
)
|
||||
def test_partial_public_source_snapshot_imports_for_lba_and_bcl(
|
||||
tmp_path,
|
||||
settings,
|
||||
source_name,
|
||||
competition_id,
|
||||
competition_name,
|
||||
):
|
||||
incoming = tmp_path / "incoming"
|
||||
archive = tmp_path / "archive"
|
||||
failed = tmp_path / "failed"
|
||||
incoming.mkdir()
|
||||
archive.mkdir()
|
||||
failed.mkdir()
|
||||
|
||||
payload = _valid_payload()
|
||||
payload["source_name"] = source_name
|
||||
row = payload["records"][0]
|
||||
row["competition_external_id"] = competition_id
|
||||
row["competition_name"] = competition_name
|
||||
for optional_field in ("first_name", "last_name", "birth_date", "nationality", "height_cm", "weight_kg", "position", "role"):
|
||||
row.pop(optional_field, None)
|
||||
|
||||
_write_json(incoming / f"{source_name}.json", payload)
|
||||
|
||||
settings.STATIC_DATASET_INCOMING_DIR = str(incoming)
|
||||
settings.STATIC_DATASET_ARCHIVE_DIR = str(archive)
|
||||
settings.STATIC_DATASET_FAILED_DIR = str(failed)
|
||||
|
||||
call_command("import_snapshots")
|
||||
|
||||
run = ImportRun.objects.get()
|
||||
assert run.status == ImportRun.RunStatus.SUCCESS
|
||||
assert Competition.objects.filter(source_uid=competition_id, name=competition_name).exists()
|
||||
assert Player.objects.filter(source_uid="player-23").exists()
|
||||
assert PlayerSeasonStats.objects.count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_invalid_snapshot_rejected_and_moved_to_failed(tmp_path, settings):
|
||||
incoming = tmp_path / "incoming"
|
||||
|
||||
Reference in New Issue
Block a user