From 2252821dafd72d131216b301b6354df7e3ed24d4 Mon Sep 17 00:00:00 2001 From: Alfredo Di Stasio Date: Tue, 10 Mar 2026 16:37:29 +0100 Subject: [PATCH] Improve ingestion concurrency safety and batch transaction robustness --- apps/ingestion/services/runs.py | 62 ++++++++++++++++++++++++++++----- apps/ingestion/services/sync.py | 61 +++++++++++++++++++++++++++----- apps/ingestion/tasks.py | 42 ++++++++++++---------- tests/test_ingestion_sync.py | 30 ++++++++++++++++ tests/test_ingestion_tasks.py | 51 +++++++++++++++++++++------ 5 files changed, 202 insertions(+), 44 deletions(-) diff --git a/apps/ingestion/services/runs.py b/apps/ingestion/services/runs.py index a540dad..93dbc66 100644 --- a/apps/ingestion/services/runs.py +++ b/apps/ingestion/services/runs.py @@ -1,6 +1,7 @@ -from datetime import timedelta +import hashlib +from contextlib import contextmanager -from django.db.models import Q +from django.db import connection from django.utils import timezone from apps.ingestion.models import IngestionError, IngestionRun @@ -62,14 +63,59 @@ def mark_ingestion_run_skipped(*, provider_namespace: str, job_type: str, reason return run -def has_running_ingestion_run(*, provider_namespace: str, job_type: str, within_minutes: int) -> bool: - cutoff = timezone.now() - timedelta(minutes=max(within_minutes, 1)) - return IngestionRun.objects.filter( +def _build_ingestion_lock_key(*, provider_namespace: str, job_type: str) -> int: + digest = hashlib.blake2b( + f"{provider_namespace}:{job_type}".encode("utf-8"), + digest_size=8, + ).digest() + return int.from_bytes(digest, byteorder="big", signed=True) + + +def try_acquire_ingestion_lock(*, provider_namespace: str, job_type: str) -> tuple[bool, int]: + lock_key = _build_ingestion_lock_key(provider_namespace=provider_namespace, job_type=job_type) + with connection.cursor() as cursor: + cursor.execute("SELECT pg_try_advisory_lock(%s);", [lock_key]) + acquired = bool(cursor.fetchone()[0]) + return acquired, lock_key + + +def release_ingestion_lock(*, lock_key: int) -> None: + with connection.cursor() as cursor: + cursor.execute("SELECT pg_advisory_unlock(%s);", [lock_key]) + + +@contextmanager +def ingestion_advisory_lock(*, provider_namespace: str, job_type: str): + acquired, lock_key = try_acquire_ingestion_lock( provider_namespace=provider_namespace, job_type=job_type, - status=IngestionRun.RunStatus.RUNNING, - started_at__gte=cutoff, - ).filter(Q(finished_at__isnull=True) | Q(finished_at__gte=cutoff)).exists() + ) + try: + yield acquired + finally: + if acquired: + release_ingestion_lock(lock_key=lock_key) + + +def update_ingestion_run_progress( + *, + run: IngestionRun, + completed_step: str, + step_summary: dict, + source_counts: dict | None = None, +) -> IngestionRun: + context = dict(run.context or {}) + completed_steps = list(context.get("completed_steps") or []) + completed_steps.append(completed_step) + context["completed_steps"] = completed_steps + step_summaries = dict(context.get("step_summaries") or {}) + step_summaries[completed_step] = step_summary + context["step_summaries"] = step_summaries + if source_counts is not None: + context["source_counts"] = source_counts + run.context = context + run.save(update_fields=["context"]) + return run def log_ingestion_error(*, run: IngestionRun, message: str, provider_namespace: str, severity: str = IngestionError.Severity.ERROR, entity_type: str = "", external_id: str = "", raw_payload: dict | None = None) -> IngestionError: diff --git a/apps/ingestion/services/sync.py b/apps/ingestion/services/sync.py index 600fb04..2681d24 100644 --- a/apps/ingestion/services/sync.py +++ b/apps/ingestion/services/sync.py @@ -9,7 +9,12 @@ from django.utils.dateparse import parse_date from apps.competitions.models import Competition, Season from apps.ingestion.models import IngestionRun -from apps.ingestion.services.runs import finish_ingestion_run, log_ingestion_error, start_ingestion_run +from apps.ingestion.services.runs import ( + finish_ingestion_run, + log_ingestion_error, + start_ingestion_run, + update_ingestion_run_progress, +) from apps.players.models import Nationality, Player, PlayerAlias, PlayerCareerEntry, Position, Role from apps.players.services.origin import refresh_player_origin from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError @@ -441,14 +446,54 @@ def run_sync_job( if job_type == IngestionRun.JobType.INCREMENTAL else provider.sync_all() ) + source_counts = { + "competitions": len(payload.get("competitions", [])), + "teams": len(payload.get("teams", [])), + "seasons": len(payload.get("seasons", [])), + "players": len(payload.get("players", [])), + "player_stats": len(payload.get("player_stats", [])), + "player_careers": len(payload.get("player_careers", [])), + } - with transaction.atomic(): - _sync_competitions(provider_namespace, payload.get("competitions", []), run, summary) - _sync_teams(provider_namespace, payload.get("teams", []), run, summary) - _sync_seasons(provider_namespace, payload.get("seasons", []), run, summary) - _sync_players(provider_namespace, payload.get("players", []), run, summary) - _sync_player_stats(provider_namespace, payload.get("player_stats", []), run, summary) - _sync_player_careers(provider_namespace, payload.get("player_careers", []), run, summary) + steps: list[tuple[str, callable, list[dict]]] = [ + ("competitions", _sync_competitions, payload.get("competitions", [])), + ("teams", _sync_teams, payload.get("teams", [])), + ("seasons", _sync_seasons, payload.get("seasons", [])), + ("players", _sync_players, payload.get("players", [])), + ("player_stats", _sync_player_stats, payload.get("player_stats", [])), + ("player_careers", _sync_player_careers, payload.get("player_careers", [])), + ] + + for step_name, step_fn, step_payload in steps: + step_summary = SyncSummary() + with transaction.atomic(): + step_fn(provider_namespace, step_payload, run, step_summary) + + summary.processed += step_summary.processed + summary.created += step_summary.created + summary.updated += step_summary.updated + summary.failed += step_summary.failed + + update_ingestion_run_progress( + run=run, + completed_step=step_name, + step_summary={ + "processed": step_summary.processed, + "created": step_summary.created, + "updated": step_summary.updated, + "failed": step_summary.failed, + }, + source_counts=source_counts, + ) + logger.info( + "Completed ingestion step run_id=%s step=%s processed=%s created=%s updated=%s failed=%s", + run.id, + step_name, + step_summary.processed, + step_summary.created, + step_summary.updated, + step_summary.failed, + ) success_error_summary = "" if summary.failed > 0: diff --git a/apps/ingestion/tasks.py b/apps/ingestion/tasks.py index d996e4d..1199c38 100644 --- a/apps/ingestion/tasks.py +++ b/apps/ingestion/tasks.py @@ -4,7 +4,7 @@ from celery import shared_task from django.conf import settings from apps.ingestion.models import IngestionRun -from apps.ingestion.services.runs import has_running_ingestion_run, mark_ingestion_run_skipped +from apps.ingestion.services.runs import ingestion_advisory_lock, mark_ingestion_run_skipped from apps.ingestion.services.sync import run_sync_job from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError from apps.providers.registry import get_default_provider_namespace @@ -21,23 +21,29 @@ def _run_sync_with_overlap_guard( cursor: str | None = None, ): effective_context = context or {} - if settings.INGESTION_PREVENT_OVERLAP and has_running_ingestion_run( - provider_namespace=provider_namespace, - job_type=job_type, - within_minutes=settings.INGESTION_OVERLAP_WINDOW_MINUTES, - ): - reason = ( - f"Skipped due to overlapping running job for provider={provider_namespace}, " - f"job_type={job_type}." - ) - logger.warning(reason) - run = mark_ingestion_run_skipped( - provider_namespace=provider_namespace, - job_type=job_type, - reason=reason, - context=effective_context, - ) - return run.id + if settings.INGESTION_PREVENT_OVERLAP: + with ingestion_advisory_lock(provider_namespace=provider_namespace, job_type=job_type) as acquired: + if not acquired: + reason = ( + f"Skipped due to advisory lock for provider={provider_namespace}, " + f"job_type={job_type}." + ) + logger.warning(reason) + run = mark_ingestion_run_skipped( + provider_namespace=provider_namespace, + job_type=job_type, + reason=reason, + context=effective_context, + ) + return run.id + + return run_sync_job( + provider_namespace=provider_namespace, + job_type=job_type, + triggered_by_id=triggered_by_id, + context=effective_context, + cursor=cursor, + ).id return run_sync_job( provider_namespace=provider_namespace, diff --git a/tests/test_ingestion_sync.py b/tests/test_ingestion_sync.py index 9f99ca5..0bcf129 100644 --- a/tests/test_ingestion_sync.py +++ b/tests/test_ingestion_sync.py @@ -26,6 +26,15 @@ def test_run_full_sync_creates_domain_objects(settings): assert PlayerSeason.objects.count() >= 1 assert PlayerSeasonStats.objects.count() >= 1 assert Player.objects.filter(origin_competition__isnull=False).exists() + assert run.context.get("completed_steps") == [ + "competitions", + "teams", + "seasons", + "players", + "player_stats", + "player_careers", + ] + assert run.context.get("source_counts", {}).get("players", 0) >= 1 @pytest.mark.django_db @@ -215,3 +224,24 @@ def test_balldontlie_sync_idempotency_with_stable_payload(monkeypatch): } assert counts_first == counts_second + + +@pytest.mark.django_db +def test_batch_transactions_preserve_prior_step_progress_on_failure(settings, monkeypatch): + settings.PROVIDER_DEFAULT_NAMESPACE = "mvp_demo" + + def boom(*args, **kwargs): + raise RuntimeError("teams-sync-failed") + + monkeypatch.setattr("apps.ingestion.services.sync._sync_teams", boom) + + with pytest.raises(RuntimeError): + run_sync_job(provider_namespace="mvp_demo", job_type=IngestionRun.JobType.FULL_SYNC) + + run = IngestionRun.objects.order_by("-id").first() + assert run is not None + assert run.status == IngestionRun.RunStatus.FAILED + assert Competition.objects.exists() + assert Team.objects.count() == 0 + assert run.context.get("completed_steps") == ["competitions"] + assert "Unhandled ingestion error" in run.error_summary diff --git a/tests/test_ingestion_tasks.py b/tests/test_ingestion_tasks.py index ec399f4..c53973f 100644 --- a/tests/test_ingestion_tasks.py +++ b/tests/test_ingestion_tasks.py @@ -1,8 +1,11 @@ import pytest +from contextlib import contextmanager from celery.schedules import crontab -from django.utils import timezone +import psycopg +from django.conf import settings from apps.ingestion.models import IngestionRun +from apps.ingestion.services.runs import _build_ingestion_lock_key, release_ingestion_lock, try_acquire_ingestion_lock from apps.ingestion.tasks import scheduled_provider_sync, trigger_incremental_sync from config.celery import app as celery_app, build_periodic_schedule @@ -44,23 +47,51 @@ def test_build_periodic_schedule_invalid_cron_disables_task_and_logs(settings, c @pytest.mark.django_db -def test_trigger_incremental_sync_skips_overlapping_runs(settings): +def test_trigger_incremental_sync_skips_when_advisory_lock_not_acquired(settings, monkeypatch): settings.INGESTION_PREVENT_OVERLAP = True - settings.INGESTION_OVERLAP_WINDOW_MINUTES = 180 - IngestionRun.objects.create( - provider_namespace="mvp_demo", - job_type=IngestionRun.JobType.INCREMENTAL, - status=IngestionRun.RunStatus.RUNNING, - started_at=timezone.now(), - ) + @contextmanager + def fake_lock(**kwargs): + yield False + monkeypatch.setattr("apps.ingestion.tasks.ingestion_advisory_lock", fake_lock) run_id = trigger_incremental_sync.apply( kwargs={"provider_namespace": "mvp_demo"}, ).get() skipped_run = IngestionRun.objects.get(id=run_id) assert skipped_run.status == IngestionRun.RunStatus.CANCELED - assert "overlapping running job" in skipped_run.error_summary + assert "advisory lock" in skipped_run.error_summary + + +@pytest.mark.django_db +def test_advisory_lock_prevents_concurrent_acquisition(): + provider_namespace = "mvp_demo" + job_type = IngestionRun.JobType.INCREMENTAL + lock_key = _build_ingestion_lock_key(provider_namespace=provider_namespace, job_type=job_type) + + conninfo = ( + f"dbname={settings.DATABASES['default']['NAME']} " + f"user={settings.DATABASES['default']['USER']} " + f"password={settings.DATABASES['default']['PASSWORD']} " + f"host={settings.DATABASES['default']['HOST']} " + f"port={settings.DATABASES['default']['PORT']}" + ) + with psycopg.connect(conninfo) as external_conn: + with external_conn.cursor() as cursor: + cursor.execute("SELECT pg_advisory_lock(%s);", [lock_key]) + acquired, _ = try_acquire_ingestion_lock( + provider_namespace=provider_namespace, + job_type=job_type, + ) + assert acquired is False + cursor.execute("SELECT pg_advisory_unlock(%s);", [lock_key]) + + acquired, django_key = try_acquire_ingestion_lock( + provider_namespace=provider_namespace, + job_type=job_type, + ) + assert acquired is True + release_ingestion_lock(lock_key=django_key) @pytest.mark.django_db