From ceff4bc42c7bfbe656776614361c76ad67daf28e Mon Sep 17 00:00:00 2001 From: Alfredo Di Stasio Date: Tue, 10 Mar 2026 13:44:36 +0100 Subject: [PATCH] Wire Celery Beat periodic sync with ingestion run tracking --- .env.example | 6 ++ README.md | 17 ++++- apps/ingestion/admin.py | 20 +++++ .../0002_ingestionrun_error_summary.py | 18 +++++ apps/ingestion/models.py | 1 + apps/ingestion/services/runs.py | 40 +++++++++- apps/ingestion/services/sync.py | 22 ++++++ apps/ingestion/tasks.py | 75 ++++++++++++++++++- config/celery.py | 33 ++++++++ config/settings/base.py | 9 +++ tests/test_ingestion_sync.py | 7 ++ tests/test_ingestion_tasks.py | 69 +++++++++++++++++ 12 files changed, 311 insertions(+), 6 deletions(-) create mode 100644 apps/ingestion/migrations/0002_ingestionrun_error_summary.py create mode 100644 tests/test_ingestion_tasks.py diff --git a/.env.example b/.env.example index 3082c64..92c2f74 100644 --- a/.env.example +++ b/.env.example @@ -55,6 +55,12 @@ PROVIDER_BALLDONTLIE_STATS_PAGE_LIMIT=10 PROVIDER_BALLDONTLIE_STATS_PER_PAGE=100 CELERY_TASK_TIME_LIMIT=1800 CELERY_TASK_SOFT_TIME_LIMIT=1500 +INGESTION_SCHEDULE_ENABLED=0 +INGESTION_SCHEDULE_CRON=*/30 * * * * +INGESTION_SCHEDULE_PROVIDER_NAMESPACE= +INGESTION_SCHEDULE_JOB_TYPE=incremental +INGESTION_PREVENT_OVERLAP=1 +INGESTION_OVERLAP_WINDOW_MINUTES=180 API_THROTTLE_ANON=100/hour API_THROTTLE_USER=1000/hour diff --git a/README.md b/README.md index 12555bd..c2453c2 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ docker compose exec web python manage.py createsuperuser - `web` service also builds Tailwind CSS before `collectstatic` when `AUTO_BUILD_TAILWIND=1`. - `web`, `celery_worker`, `celery_beat`, and `tailwind` run as a non-root user inside the image. - `celery_worker` executes background sync work. -- `celery_beat` supports scheduled jobs (future scheduling strategy can be added per provider). +- `celery_beat` triggers periodic provider sync (`apps.ingestion.tasks.scheduled_provider_sync`). - `tailwind` service runs watch mode for development (`npm run dev`). - nginx proxies web traffic and serves static/media volume mounts. @@ -222,6 +222,21 @@ trigger_full_sync.delay(provider_namespace="balldontlie") - Run-level status/counters: `IngestionRun` - Structured error records: `IngestionError` - Provider entity mappings + diagnostic payload snippets: `ExternalMapping` +- `IngestionRun.error_summary` captures top-level failure/partial-failure context + +### Scheduled sync via Celery Beat + +Configure scheduled sync through environment variables: + +- `INGESTION_SCHEDULE_ENABLED` (`0`/`1`) +- `INGESTION_SCHEDULE_CRON` (5-field cron expression, default `*/30 * * * *`) +- `INGESTION_SCHEDULE_PROVIDER_NAMESPACE` (optional; falls back to default provider namespace) +- `INGESTION_SCHEDULE_JOB_TYPE` (`incremental` or `full_sync`) +- `INGESTION_PREVENT_OVERLAP` (`0`/`1`) to skip obvious overlapping runs +- `INGESTION_OVERLAP_WINDOW_MINUTES` overlap guard window + +When enabled, Celery Beat enqueues the scheduled sync task on the configured cron. +The task uses the existing ingestion service path and writes run/error records in the same tables as manual sync. ## Provider Backend Selection diff --git a/apps/ingestion/admin.py b/apps/ingestion/admin.py index f863d44..e0a7dfb 100644 --- a/apps/ingestion/admin.py +++ b/apps/ingestion/admin.py @@ -1,5 +1,6 @@ from django.contrib import admin from django.contrib import messages +from django.db.models import Count from apps.providers.registry import get_default_provider_namespace @@ -20,7 +21,11 @@ class IngestionRunAdmin(admin.ModelAdmin): "job_type", "status", "records_processed", + "records_created", + "records_updated", "records_failed", + "error_count", + "short_error_summary", "started_at", "finished_at", ) @@ -38,6 +43,7 @@ class IngestionRunAdmin(admin.ModelAdmin): "records_created", "records_updated", "records_failed", + "error_summary", "context", "raw_payload", "created_at", @@ -79,6 +85,20 @@ class IngestionRunAdmin(admin.ModelAdmin): count += 1 self.message_user(request, f"Queued {count} retry task(s).", level=messages.SUCCESS) + def get_queryset(self, request): + queryset = super().get_queryset(request) + return queryset.annotate(_error_count=Count("errors")) + + @admin.display(ordering="_error_count", description="Errors") + def error_count(self, obj): + return getattr(obj, "_error_count", 0) + + @admin.display(description="Error summary") + def short_error_summary(self, obj): + if not obj.error_summary: + return "-" + return (obj.error_summary[:90] + "...") if len(obj.error_summary) > 90 else obj.error_summary + @admin.register(IngestionError) class IngestionErrorAdmin(admin.ModelAdmin): diff --git a/apps/ingestion/migrations/0002_ingestionrun_error_summary.py b/apps/ingestion/migrations/0002_ingestionrun_error_summary.py new file mode 100644 index 0000000..f8e355d --- /dev/null +++ b/apps/ingestion/migrations/0002_ingestionrun_error_summary.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.12 on 2026-03-10 16:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("ingestion", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="ingestionrun", + name="error_summary", + field=models.TextField(blank=True, default=""), + ), + ] diff --git a/apps/ingestion/models.py b/apps/ingestion/models.py index ba1f16b..9d42bbb 100644 --- a/apps/ingestion/models.py +++ b/apps/ingestion/models.py @@ -31,6 +31,7 @@ class IngestionRun(models.Model): records_created = models.PositiveIntegerField(default=0) records_updated = models.PositiveIntegerField(default=0) records_failed = models.PositiveIntegerField(default=0) + error_summary = models.TextField(blank=True, default="") context = models.JSONField(default=dict, blank=True) raw_payload = models.JSONField(default=dict, blank=True) created_at = models.DateTimeField(auto_now_add=True) diff --git a/apps/ingestion/services/runs.py b/apps/ingestion/services/runs.py index 028d41e..a540dad 100644 --- a/apps/ingestion/services/runs.py +++ b/apps/ingestion/services/runs.py @@ -1,3 +1,6 @@ +from datetime import timedelta + +from django.db.models import Q from django.utils import timezone from apps.ingestion.models import IngestionError, IngestionRun @@ -14,12 +17,22 @@ def start_ingestion_run(*, provider_namespace: str, job_type: str, triggered_by= ) -def finish_ingestion_run(*, run: IngestionRun, status: str, processed: int = 0, created: int = 0, updated: int = 0, failed: int = 0) -> IngestionRun: +def finish_ingestion_run( + *, + run: IngestionRun, + status: str, + processed: int = 0, + created: int = 0, + updated: int = 0, + failed: int = 0, + error_summary: str = "", +) -> IngestionRun: run.status = status run.records_processed = processed run.records_created = created run.records_updated = updated run.records_failed = failed + run.error_summary = error_summary run.finished_at = timezone.now() run.save( update_fields=[ @@ -28,12 +41,37 @@ def finish_ingestion_run(*, run: IngestionRun, status: str, processed: int = 0, "records_created", "records_updated", "records_failed", + "error_summary", "finished_at", ] ) return run +def mark_ingestion_run_skipped(*, provider_namespace: str, job_type: str, reason: str, context: dict | None = None) -> IngestionRun: + now = timezone.now() + run = IngestionRun.objects.create( + provider_namespace=provider_namespace, + job_type=job_type, + status=IngestionRun.RunStatus.CANCELED, + started_at=now, + finished_at=now, + error_summary=reason, + context=context or {}, + ) + return run + + +def has_running_ingestion_run(*, provider_namespace: str, job_type: str, within_minutes: int) -> bool: + cutoff = timezone.now() - timedelta(minutes=max(within_minutes, 1)) + return IngestionRun.objects.filter( + provider_namespace=provider_namespace, + job_type=job_type, + status=IngestionRun.RunStatus.RUNNING, + started_at__gte=cutoff, + ).filter(Q(finished_at__isnull=True) | Q(finished_at__gte=cutoff)).exists() + + def log_ingestion_error(*, run: IngestionRun, message: str, provider_namespace: str, severity: str = IngestionError.Severity.ERROR, entity_type: str = "", external_id: str = "", raw_payload: dict | None = None) -> IngestionError: return IngestionError.objects.create( ingestion_run=run, diff --git a/apps/ingestion/services/sync.py b/apps/ingestion/services/sync.py index 901e727..600fb04 100644 --- a/apps/ingestion/services/sync.py +++ b/apps/ingestion/services/sync.py @@ -427,6 +427,12 @@ def run_sync_job( context=context or {}, ) summary = SyncSummary() + logger.info( + "Starting ingestion run id=%s provider=%s job_type=%s", + run.id, + provider_namespace, + job_type, + ) try: provider = get_provider(provider_namespace) @@ -444,6 +450,9 @@ def run_sync_job( _sync_player_stats(provider_namespace, payload.get("player_stats", []), run, summary) _sync_player_careers(provider_namespace, payload.get("player_careers", []), run, summary) + success_error_summary = "" + if summary.failed > 0: + success_error_summary = f"Completed with {summary.failed} failed record(s)." finish_ingestion_run( run=run, status=IngestionRun.RunStatus.SUCCESS, @@ -451,6 +460,16 @@ def run_sync_job( created=summary.created, updated=summary.updated, failed=summary.failed, + error_summary=success_error_summary, + ) + logger.info( + "Completed ingestion run id=%s status=%s processed=%s created=%s updated=%s failed=%s", + run.id, + IngestionRun.RunStatus.SUCCESS, + summary.processed, + summary.created, + summary.updated, + summary.failed, ) return run @@ -471,6 +490,7 @@ def run_sync_job( created=summary.created, updated=summary.updated, failed=summary.failed + 1, + error_summary=f"Rate limit from provider: {exc}", ) raise @@ -490,6 +510,7 @@ def run_sync_job( created=summary.created, updated=summary.updated, failed=summary.failed + 1, + error_summary=f"Transient provider error: {exc}", ) raise @@ -509,5 +530,6 @@ def run_sync_job( created=summary.created, updated=summary.updated, failed=summary.failed + 1, + error_summary=f"Unhandled ingestion error: {exc}", ) raise diff --git a/apps/ingestion/tasks.py b/apps/ingestion/tasks.py index 90e12bf..d996e4d 100644 --- a/apps/ingestion/tasks.py +++ b/apps/ingestion/tasks.py @@ -1,8 +1,51 @@ +import logging + from celery import shared_task +from django.conf import settings from apps.ingestion.models import IngestionRun +from apps.ingestion.services.runs import has_running_ingestion_run, mark_ingestion_run_skipped from apps.ingestion.services.sync import run_sync_job from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError +from apps.providers.registry import get_default_provider_namespace + +logger = logging.getLogger(__name__) + + +def _run_sync_with_overlap_guard( + *, + provider_namespace: str, + job_type: str, + triggered_by_id: int | None = None, + context: dict | None = None, + cursor: str | None = None, +): + effective_context = context or {} + if settings.INGESTION_PREVENT_OVERLAP and has_running_ingestion_run( + provider_namespace=provider_namespace, + job_type=job_type, + within_minutes=settings.INGESTION_OVERLAP_WINDOW_MINUTES, + ): + reason = ( + f"Skipped due to overlapping running job for provider={provider_namespace}, " + f"job_type={job_type}." + ) + logger.warning(reason) + run = mark_ingestion_run_skipped( + provider_namespace=provider_namespace, + job_type=job_type, + reason=reason, + context=effective_context, + ) + return run.id + + return run_sync_job( + provider_namespace=provider_namespace, + job_type=job_type, + triggered_by_id=triggered_by_id, + context=effective_context, + cursor=cursor, + ).id @shared_task( @@ -13,12 +56,12 @@ from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientE retry_kwargs={"max_retries": 5}, ) def trigger_full_sync(self, provider_namespace: str, triggered_by_id: int | None = None, context: dict | None = None): - return run_sync_job( + return _run_sync_with_overlap_guard( provider_namespace=provider_namespace, job_type=IngestionRun.JobType.FULL_SYNC, triggered_by_id=triggered_by_id, context=context or {}, - ).id + ) @shared_task( @@ -29,10 +72,34 @@ def trigger_full_sync(self, provider_namespace: str, triggered_by_id: int | None retry_kwargs={"max_retries": 5}, ) def trigger_incremental_sync(self, provider_namespace: str, cursor: str | None = None, triggered_by_id: int | None = None, context: dict | None = None): - return run_sync_job( + return _run_sync_with_overlap_guard( provider_namespace=provider_namespace, job_type=IngestionRun.JobType.INCREMENTAL, triggered_by_id=triggered_by_id, context=context or {}, cursor=cursor, - ).id + ) + + +@shared_task( + bind=True, + name="apps.ingestion.tasks.scheduled_provider_sync", + autoretry_for=(ProviderRateLimitError, ProviderTransientError), + retry_backoff=True, + retry_jitter=True, + retry_kwargs={"max_retries": 5}, +) +def scheduled_provider_sync(self): + provider_namespace = settings.INGESTION_SCHEDULE_PROVIDER_NAMESPACE or get_default_provider_namespace() + context = {"trigger": "celery_beat", "task_id": self.request.id} + if settings.INGESTION_SCHEDULE_JOB_TYPE == IngestionRun.JobType.FULL_SYNC: + return _run_sync_with_overlap_guard( + provider_namespace=provider_namespace, + job_type=IngestionRun.JobType.FULL_SYNC, + context=context, + ) + return _run_sync_with_overlap_guard( + provider_namespace=provider_namespace, + job_type=IngestionRun.JobType.INCREMENTAL, + context=context, + ) diff --git a/config/celery.py b/config/celery.py index 0f0d798..dff93d5 100644 --- a/config/celery.py +++ b/config/celery.py @@ -1,8 +1,41 @@ import os from celery import Celery +from celery.schedules import crontab +from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.development") app = Celery("hoopscout") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() + + +def _parse_cron_expression(expression: str) -> dict[str, str]: + parts = expression.split() + if len(parts) != 5: + raise ValueError( + "INGESTION_SCHEDULE_CRON must have 5 fields: minute hour day_of_month month_of_year day_of_week." + ) + return { + "minute": parts[0], + "hour": parts[1], + "day_of_month": parts[2], + "month_of_year": parts[3], + "day_of_week": parts[4], + } + + +def build_periodic_schedule() -> dict: + if not settings.INGESTION_SCHEDULE_ENABLED: + return {} + + schedule_kwargs = _parse_cron_expression(settings.INGESTION_SCHEDULE_CRON) + return { + "ingestion.scheduled_provider_sync": { + "task": "apps.ingestion.tasks.scheduled_provider_sync", + "schedule": crontab(**schedule_kwargs), + } + } + + +app.conf.beat_schedule = build_periodic_schedule() diff --git a/config/settings/base.py b/config/settings/base.py index db5cf54..25b3c73 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -124,6 +124,15 @@ CELERY_RESULT_SERIALIZER = "json" CELERY_TIMEZONE = TIME_ZONE CELERY_TASK_TIME_LIMIT = int(os.getenv("CELERY_TASK_TIME_LIMIT", "1800")) CELERY_TASK_SOFT_TIME_LIMIT = int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "1500")) +INGESTION_SCHEDULE_ENABLED = env_bool("INGESTION_SCHEDULE_ENABLED", False) +INGESTION_SCHEDULE_CRON = os.getenv("INGESTION_SCHEDULE_CRON", "*/30 * * * *").strip() +INGESTION_SCHEDULE_PROVIDER_NAMESPACE = os.getenv("INGESTION_SCHEDULE_PROVIDER_NAMESPACE", "").strip() +INGESTION_SCHEDULE_JOB_TYPE = os.getenv("INGESTION_SCHEDULE_JOB_TYPE", "incremental").strip().lower() +INGESTION_PREVENT_OVERLAP = env_bool("INGESTION_PREVENT_OVERLAP", True) +INGESTION_OVERLAP_WINDOW_MINUTES = int(os.getenv("INGESTION_OVERLAP_WINDOW_MINUTES", "180")) + +if INGESTION_SCHEDULE_JOB_TYPE not in {"incremental", "full_sync"}: + raise ImproperlyConfigured("INGESTION_SCHEDULE_JOB_TYPE must be either 'incremental' or 'full_sync'.") PROVIDER_BACKEND = os.getenv("PROVIDER_BACKEND", "demo").strip().lower() PROVIDER_NAMESPACE_DEMO = os.getenv("PROVIDER_NAMESPACE_DEMO", "mvp_demo") diff --git a/tests/test_ingestion_sync.py b/tests/test_ingestion_sync.py index 50057b2..9f99ca5 100644 --- a/tests/test_ingestion_sync.py +++ b/tests/test_ingestion_sync.py @@ -67,6 +67,10 @@ def test_incremental_sync_runs_successfully(settings): assert run.status == IngestionRun.RunStatus.SUCCESS assert run.records_processed > 0 + assert run.started_at is not None + assert run.finished_at is not None + assert run.finished_at >= run.started_at + assert run.error_summary == "" @pytest.mark.django_db @@ -80,6 +84,9 @@ def test_run_sync_handles_rate_limit(settings): run = IngestionRun.objects.order_by("-id").first() assert run is not None assert run.status == IngestionRun.RunStatus.FAILED + assert run.started_at is not None + assert run.finished_at is not None + assert "Rate limit" in run.error_summary assert IngestionError.objects.filter(ingestion_run=run).exists() os.environ.pop("PROVIDER_MVP_FORCE_RATE_LIMIT", None) diff --git a/tests/test_ingestion_tasks.py b/tests/test_ingestion_tasks.py new file mode 100644 index 0000000..bf6d9e3 --- /dev/null +++ b/tests/test_ingestion_tasks.py @@ -0,0 +1,69 @@ +import pytest +from celery.schedules import crontab +from django.utils import timezone + +from apps.ingestion.models import IngestionRun +from apps.ingestion.tasks import scheduled_provider_sync, trigger_incremental_sync +from config.celery import app as celery_app, build_periodic_schedule + + +@pytest.mark.django_db +def test_periodic_task_registered(): + assert "apps.ingestion.tasks.scheduled_provider_sync" in celery_app.tasks + + +@pytest.mark.django_db +def test_build_periodic_schedule_enabled(settings): + settings.INGESTION_SCHEDULE_ENABLED = True + settings.INGESTION_SCHEDULE_CRON = "15 * * * *" + + schedule = build_periodic_schedule() + assert "ingestion.scheduled_provider_sync" in schedule + entry = schedule["ingestion.scheduled_provider_sync"] + assert entry["task"] == "apps.ingestion.tasks.scheduled_provider_sync" + assert isinstance(entry["schedule"], crontab) + assert entry["schedule"]._orig_minute == "15" + + +@pytest.mark.django_db +def test_build_periodic_schedule_disabled(settings): + settings.INGESTION_SCHEDULE_ENABLED = False + assert build_periodic_schedule() == {} + + +@pytest.mark.django_db +def test_trigger_incremental_sync_skips_overlapping_runs(settings): + settings.INGESTION_PREVENT_OVERLAP = True + settings.INGESTION_OVERLAP_WINDOW_MINUTES = 180 + + IngestionRun.objects.create( + provider_namespace="mvp_demo", + job_type=IngestionRun.JobType.INCREMENTAL, + status=IngestionRun.RunStatus.RUNNING, + started_at=timezone.now(), + ) + + run_id = trigger_incremental_sync.apply( + kwargs={"provider_namespace": "mvp_demo"}, + ).get() + skipped_run = IngestionRun.objects.get(id=run_id) + assert skipped_run.status == IngestionRun.RunStatus.CANCELED + assert "overlapping running job" in skipped_run.error_summary + + +@pytest.mark.django_db +def test_scheduled_provider_sync_uses_configured_job_type(settings, monkeypatch): + settings.INGESTION_SCHEDULE_JOB_TYPE = IngestionRun.JobType.FULL_SYNC + settings.INGESTION_SCHEDULE_PROVIDER_NAMESPACE = "mvp_demo" + captured = {} + + def fake_runner(**kwargs): + captured.update(kwargs) + return 99 + + monkeypatch.setattr("apps.ingestion.tasks._run_sync_with_overlap_guard", fake_runner) + + result = scheduled_provider_sync.apply().get() + assert result == 99 + assert captured["provider_namespace"] == "mvp_demo" + assert captured["job_type"] == IngestionRun.JobType.FULL_SYNC