Improve ingestion concurrency safety and batch transaction robustness

This commit is contained in:
Alfredo Di Stasio
2026-03-10 16:37:29 +01:00
parent abd3419aac
commit 2252821daf
5 changed files with 202 additions and 44 deletions

View File

@ -1,6 +1,7 @@
from datetime import timedelta import hashlib
from contextlib import contextmanager
from django.db.models import Q from django.db import connection
from django.utils import timezone from django.utils import timezone
from apps.ingestion.models import IngestionError, IngestionRun from apps.ingestion.models import IngestionError, IngestionRun
@ -62,14 +63,59 @@ def mark_ingestion_run_skipped(*, provider_namespace: str, job_type: str, reason
return run return run
def has_running_ingestion_run(*, provider_namespace: str, job_type: str, within_minutes: int) -> bool: def _build_ingestion_lock_key(*, provider_namespace: str, job_type: str) -> int:
cutoff = timezone.now() - timedelta(minutes=max(within_minutes, 1)) digest = hashlib.blake2b(
return IngestionRun.objects.filter( f"{provider_namespace}:{job_type}".encode("utf-8"),
digest_size=8,
).digest()
return int.from_bytes(digest, byteorder="big", signed=True)
def try_acquire_ingestion_lock(*, provider_namespace: str, job_type: str) -> tuple[bool, int]:
lock_key = _build_ingestion_lock_key(provider_namespace=provider_namespace, job_type=job_type)
with connection.cursor() as cursor:
cursor.execute("SELECT pg_try_advisory_lock(%s);", [lock_key])
acquired = bool(cursor.fetchone()[0])
return acquired, lock_key
def release_ingestion_lock(*, lock_key: int) -> None:
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_unlock(%s);", [lock_key])
@contextmanager
def ingestion_advisory_lock(*, provider_namespace: str, job_type: str):
acquired, lock_key = try_acquire_ingestion_lock(
provider_namespace=provider_namespace, provider_namespace=provider_namespace,
job_type=job_type, job_type=job_type,
status=IngestionRun.RunStatus.RUNNING, )
started_at__gte=cutoff, try:
).filter(Q(finished_at__isnull=True) | Q(finished_at__gte=cutoff)).exists() yield acquired
finally:
if acquired:
release_ingestion_lock(lock_key=lock_key)
def update_ingestion_run_progress(
*,
run: IngestionRun,
completed_step: str,
step_summary: dict,
source_counts: dict | None = None,
) -> IngestionRun:
context = dict(run.context or {})
completed_steps = list(context.get("completed_steps") or [])
completed_steps.append(completed_step)
context["completed_steps"] = completed_steps
step_summaries = dict(context.get("step_summaries") or {})
step_summaries[completed_step] = step_summary
context["step_summaries"] = step_summaries
if source_counts is not None:
context["source_counts"] = source_counts
run.context = context
run.save(update_fields=["context"])
return run
def log_ingestion_error(*, run: IngestionRun, message: str, provider_namespace: str, severity: str = IngestionError.Severity.ERROR, entity_type: str = "", external_id: str = "", raw_payload: dict | None = None) -> IngestionError: def log_ingestion_error(*, run: IngestionRun, message: str, provider_namespace: str, severity: str = IngestionError.Severity.ERROR, entity_type: str = "", external_id: str = "", raw_payload: dict | None = None) -> IngestionError:

View File

@ -9,7 +9,12 @@ from django.utils.dateparse import parse_date
from apps.competitions.models import Competition, Season from apps.competitions.models import Competition, Season
from apps.ingestion.models import IngestionRun from apps.ingestion.models import IngestionRun
from apps.ingestion.services.runs import finish_ingestion_run, log_ingestion_error, start_ingestion_run from apps.ingestion.services.runs import (
finish_ingestion_run,
log_ingestion_error,
start_ingestion_run,
update_ingestion_run_progress,
)
from apps.players.models import Nationality, Player, PlayerAlias, PlayerCareerEntry, Position, Role from apps.players.models import Nationality, Player, PlayerAlias, PlayerCareerEntry, Position, Role
from apps.players.services.origin import refresh_player_origin from apps.players.services.origin import refresh_player_origin
from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError
@ -441,14 +446,54 @@ def run_sync_job(
if job_type == IngestionRun.JobType.INCREMENTAL if job_type == IngestionRun.JobType.INCREMENTAL
else provider.sync_all() else provider.sync_all()
) )
source_counts = {
"competitions": len(payload.get("competitions", [])),
"teams": len(payload.get("teams", [])),
"seasons": len(payload.get("seasons", [])),
"players": len(payload.get("players", [])),
"player_stats": len(payload.get("player_stats", [])),
"player_careers": len(payload.get("player_careers", [])),
}
with transaction.atomic(): steps: list[tuple[str, callable, list[dict]]] = [
_sync_competitions(provider_namespace, payload.get("competitions", []), run, summary) ("competitions", _sync_competitions, payload.get("competitions", [])),
_sync_teams(provider_namespace, payload.get("teams", []), run, summary) ("teams", _sync_teams, payload.get("teams", [])),
_sync_seasons(provider_namespace, payload.get("seasons", []), run, summary) ("seasons", _sync_seasons, payload.get("seasons", [])),
_sync_players(provider_namespace, payload.get("players", []), run, summary) ("players", _sync_players, payload.get("players", [])),
_sync_player_stats(provider_namespace, payload.get("player_stats", []), run, summary) ("player_stats", _sync_player_stats, payload.get("player_stats", [])),
_sync_player_careers(provider_namespace, payload.get("player_careers", []), run, summary) ("player_careers", _sync_player_careers, payload.get("player_careers", [])),
]
for step_name, step_fn, step_payload in steps:
step_summary = SyncSummary()
with transaction.atomic():
step_fn(provider_namespace, step_payload, run, step_summary)
summary.processed += step_summary.processed
summary.created += step_summary.created
summary.updated += step_summary.updated
summary.failed += step_summary.failed
update_ingestion_run_progress(
run=run,
completed_step=step_name,
step_summary={
"processed": step_summary.processed,
"created": step_summary.created,
"updated": step_summary.updated,
"failed": step_summary.failed,
},
source_counts=source_counts,
)
logger.info(
"Completed ingestion step run_id=%s step=%s processed=%s created=%s updated=%s failed=%s",
run.id,
step_name,
step_summary.processed,
step_summary.created,
step_summary.updated,
step_summary.failed,
)
success_error_summary = "" success_error_summary = ""
if summary.failed > 0: if summary.failed > 0:

View File

@ -4,7 +4,7 @@ from celery import shared_task
from django.conf import settings from django.conf import settings
from apps.ingestion.models import IngestionRun from apps.ingestion.models import IngestionRun
from apps.ingestion.services.runs import has_running_ingestion_run, mark_ingestion_run_skipped from apps.ingestion.services.runs import ingestion_advisory_lock, mark_ingestion_run_skipped
from apps.ingestion.services.sync import run_sync_job from apps.ingestion.services.sync import run_sync_job
from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError
from apps.providers.registry import get_default_provider_namespace from apps.providers.registry import get_default_provider_namespace
@ -21,23 +21,29 @@ def _run_sync_with_overlap_guard(
cursor: str | None = None, cursor: str | None = None,
): ):
effective_context = context or {} effective_context = context or {}
if settings.INGESTION_PREVENT_OVERLAP and has_running_ingestion_run( if settings.INGESTION_PREVENT_OVERLAP:
provider_namespace=provider_namespace, with ingestion_advisory_lock(provider_namespace=provider_namespace, job_type=job_type) as acquired:
job_type=job_type, if not acquired:
within_minutes=settings.INGESTION_OVERLAP_WINDOW_MINUTES, reason = (
): f"Skipped due to advisory lock for provider={provider_namespace}, "
reason = ( f"job_type={job_type}."
f"Skipped due to overlapping running job for provider={provider_namespace}, " )
f"job_type={job_type}." logger.warning(reason)
) run = mark_ingestion_run_skipped(
logger.warning(reason) provider_namespace=provider_namespace,
run = mark_ingestion_run_skipped( job_type=job_type,
provider_namespace=provider_namespace, reason=reason,
job_type=job_type, context=effective_context,
reason=reason, )
context=effective_context, return run.id
)
return run.id return run_sync_job(
provider_namespace=provider_namespace,
job_type=job_type,
triggered_by_id=triggered_by_id,
context=effective_context,
cursor=cursor,
).id
return run_sync_job( return run_sync_job(
provider_namespace=provider_namespace, provider_namespace=provider_namespace,

View File

@ -26,6 +26,15 @@ def test_run_full_sync_creates_domain_objects(settings):
assert PlayerSeason.objects.count() >= 1 assert PlayerSeason.objects.count() >= 1
assert PlayerSeasonStats.objects.count() >= 1 assert PlayerSeasonStats.objects.count() >= 1
assert Player.objects.filter(origin_competition__isnull=False).exists() assert Player.objects.filter(origin_competition__isnull=False).exists()
assert run.context.get("completed_steps") == [
"competitions",
"teams",
"seasons",
"players",
"player_stats",
"player_careers",
]
assert run.context.get("source_counts", {}).get("players", 0) >= 1
@pytest.mark.django_db @pytest.mark.django_db
@ -215,3 +224,24 @@ def test_balldontlie_sync_idempotency_with_stable_payload(monkeypatch):
} }
assert counts_first == counts_second assert counts_first == counts_second
@pytest.mark.django_db
def test_batch_transactions_preserve_prior_step_progress_on_failure(settings, monkeypatch):
settings.PROVIDER_DEFAULT_NAMESPACE = "mvp_demo"
def boom(*args, **kwargs):
raise RuntimeError("teams-sync-failed")
monkeypatch.setattr("apps.ingestion.services.sync._sync_teams", boom)
with pytest.raises(RuntimeError):
run_sync_job(provider_namespace="mvp_demo", job_type=IngestionRun.JobType.FULL_SYNC)
run = IngestionRun.objects.order_by("-id").first()
assert run is not None
assert run.status == IngestionRun.RunStatus.FAILED
assert Competition.objects.exists()
assert Team.objects.count() == 0
assert run.context.get("completed_steps") == ["competitions"]
assert "Unhandled ingestion error" in run.error_summary

View File

@ -1,8 +1,11 @@
import pytest import pytest
from contextlib import contextmanager
from celery.schedules import crontab from celery.schedules import crontab
from django.utils import timezone import psycopg
from django.conf import settings
from apps.ingestion.models import IngestionRun from apps.ingestion.models import IngestionRun
from apps.ingestion.services.runs import _build_ingestion_lock_key, release_ingestion_lock, try_acquire_ingestion_lock
from apps.ingestion.tasks import scheduled_provider_sync, trigger_incremental_sync from apps.ingestion.tasks import scheduled_provider_sync, trigger_incremental_sync
from config.celery import app as celery_app, build_periodic_schedule from config.celery import app as celery_app, build_periodic_schedule
@ -44,23 +47,51 @@ def test_build_periodic_schedule_invalid_cron_disables_task_and_logs(settings, c
@pytest.mark.django_db @pytest.mark.django_db
def test_trigger_incremental_sync_skips_overlapping_runs(settings): def test_trigger_incremental_sync_skips_when_advisory_lock_not_acquired(settings, monkeypatch):
settings.INGESTION_PREVENT_OVERLAP = True settings.INGESTION_PREVENT_OVERLAP = True
settings.INGESTION_OVERLAP_WINDOW_MINUTES = 180
IngestionRun.objects.create( @contextmanager
provider_namespace="mvp_demo", def fake_lock(**kwargs):
job_type=IngestionRun.JobType.INCREMENTAL, yield False
status=IngestionRun.RunStatus.RUNNING,
started_at=timezone.now(),
)
monkeypatch.setattr("apps.ingestion.tasks.ingestion_advisory_lock", fake_lock)
run_id = trigger_incremental_sync.apply( run_id = trigger_incremental_sync.apply(
kwargs={"provider_namespace": "mvp_demo"}, kwargs={"provider_namespace": "mvp_demo"},
).get() ).get()
skipped_run = IngestionRun.objects.get(id=run_id) skipped_run = IngestionRun.objects.get(id=run_id)
assert skipped_run.status == IngestionRun.RunStatus.CANCELED assert skipped_run.status == IngestionRun.RunStatus.CANCELED
assert "overlapping running job" in skipped_run.error_summary assert "advisory lock" in skipped_run.error_summary
@pytest.mark.django_db
def test_advisory_lock_prevents_concurrent_acquisition():
provider_namespace = "mvp_demo"
job_type = IngestionRun.JobType.INCREMENTAL
lock_key = _build_ingestion_lock_key(provider_namespace=provider_namespace, job_type=job_type)
conninfo = (
f"dbname={settings.DATABASES['default']['NAME']} "
f"user={settings.DATABASES['default']['USER']} "
f"password={settings.DATABASES['default']['PASSWORD']} "
f"host={settings.DATABASES['default']['HOST']} "
f"port={settings.DATABASES['default']['PORT']}"
)
with psycopg.connect(conninfo) as external_conn:
with external_conn.cursor() as cursor:
cursor.execute("SELECT pg_advisory_lock(%s);", [lock_key])
acquired, _ = try_acquire_ingestion_lock(
provider_namespace=provider_namespace,
job_type=job_type,
)
assert acquired is False
cursor.execute("SELECT pg_advisory_unlock(%s);", [lock_key])
acquired, django_key = try_acquire_ingestion_lock(
provider_namespace=provider_namespace,
job_type=job_type,
)
assert acquired is True
release_ingestion_lock(lock_key=django_key)
@pytest.mark.django_db @pytest.mark.django_db