Wire Celery Beat periodic sync with ingestion run tracking
This commit is contained in:
@ -1,5 +1,6 @@
|
||||
from django.contrib import admin
|
||||
from django.contrib import messages
|
||||
from django.db.models import Count
|
||||
|
||||
from apps.providers.registry import get_default_provider_namespace
|
||||
|
||||
@ -20,7 +21,11 @@ class IngestionRunAdmin(admin.ModelAdmin):
|
||||
"job_type",
|
||||
"status",
|
||||
"records_processed",
|
||||
"records_created",
|
||||
"records_updated",
|
||||
"records_failed",
|
||||
"error_count",
|
||||
"short_error_summary",
|
||||
"started_at",
|
||||
"finished_at",
|
||||
)
|
||||
@ -38,6 +43,7 @@ class IngestionRunAdmin(admin.ModelAdmin):
|
||||
"records_created",
|
||||
"records_updated",
|
||||
"records_failed",
|
||||
"error_summary",
|
||||
"context",
|
||||
"raw_payload",
|
||||
"created_at",
|
||||
@ -79,6 +85,20 @@ class IngestionRunAdmin(admin.ModelAdmin):
|
||||
count += 1
|
||||
self.message_user(request, f"Queued {count} retry task(s).", level=messages.SUCCESS)
|
||||
|
||||
def get_queryset(self, request):
|
||||
queryset = super().get_queryset(request)
|
||||
return queryset.annotate(_error_count=Count("errors"))
|
||||
|
||||
@admin.display(ordering="_error_count", description="Errors")
|
||||
def error_count(self, obj):
|
||||
return getattr(obj, "_error_count", 0)
|
||||
|
||||
@admin.display(description="Error summary")
|
||||
def short_error_summary(self, obj):
|
||||
if not obj.error_summary:
|
||||
return "-"
|
||||
return (obj.error_summary[:90] + "...") if len(obj.error_summary) > 90 else obj.error_summary
|
||||
|
||||
|
||||
@admin.register(IngestionError)
|
||||
class IngestionErrorAdmin(admin.ModelAdmin):
|
||||
|
||||
18
apps/ingestion/migrations/0002_ingestionrun_error_summary.py
Normal file
18
apps/ingestion/migrations/0002_ingestionrun_error_summary.py
Normal file
@ -0,0 +1,18 @@
|
||||
# Generated by Django 5.2.12 on 2026-03-10 16:12
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("ingestion", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="ingestionrun",
|
||||
name="error_summary",
|
||||
field=models.TextField(blank=True, default=""),
|
||||
),
|
||||
]
|
||||
@ -31,6 +31,7 @@ class IngestionRun(models.Model):
|
||||
records_created = models.PositiveIntegerField(default=0)
|
||||
records_updated = models.PositiveIntegerField(default=0)
|
||||
records_failed = models.PositiveIntegerField(default=0)
|
||||
error_summary = models.TextField(blank=True, default="")
|
||||
context = models.JSONField(default=dict, blank=True)
|
||||
raw_payload = models.JSONField(default=dict, blank=True)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
@ -1,3 +1,6 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from django.db.models import Q
|
||||
from django.utils import timezone
|
||||
|
||||
from apps.ingestion.models import IngestionError, IngestionRun
|
||||
@ -14,12 +17,22 @@ def start_ingestion_run(*, provider_namespace: str, job_type: str, triggered_by=
|
||||
)
|
||||
|
||||
|
||||
def finish_ingestion_run(*, run: IngestionRun, status: str, processed: int = 0, created: int = 0, updated: int = 0, failed: int = 0) -> IngestionRun:
|
||||
def finish_ingestion_run(
|
||||
*,
|
||||
run: IngestionRun,
|
||||
status: str,
|
||||
processed: int = 0,
|
||||
created: int = 0,
|
||||
updated: int = 0,
|
||||
failed: int = 0,
|
||||
error_summary: str = "",
|
||||
) -> IngestionRun:
|
||||
run.status = status
|
||||
run.records_processed = processed
|
||||
run.records_created = created
|
||||
run.records_updated = updated
|
||||
run.records_failed = failed
|
||||
run.error_summary = error_summary
|
||||
run.finished_at = timezone.now()
|
||||
run.save(
|
||||
update_fields=[
|
||||
@ -28,12 +41,37 @@ def finish_ingestion_run(*, run: IngestionRun, status: str, processed: int = 0,
|
||||
"records_created",
|
||||
"records_updated",
|
||||
"records_failed",
|
||||
"error_summary",
|
||||
"finished_at",
|
||||
]
|
||||
)
|
||||
return run
|
||||
|
||||
|
||||
def mark_ingestion_run_skipped(*, provider_namespace: str, job_type: str, reason: str, context: dict | None = None) -> IngestionRun:
|
||||
now = timezone.now()
|
||||
run = IngestionRun.objects.create(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=job_type,
|
||||
status=IngestionRun.RunStatus.CANCELED,
|
||||
started_at=now,
|
||||
finished_at=now,
|
||||
error_summary=reason,
|
||||
context=context or {},
|
||||
)
|
||||
return run
|
||||
|
||||
|
||||
def has_running_ingestion_run(*, provider_namespace: str, job_type: str, within_minutes: int) -> bool:
|
||||
cutoff = timezone.now() - timedelta(minutes=max(within_minutes, 1))
|
||||
return IngestionRun.objects.filter(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=job_type,
|
||||
status=IngestionRun.RunStatus.RUNNING,
|
||||
started_at__gte=cutoff,
|
||||
).filter(Q(finished_at__isnull=True) | Q(finished_at__gte=cutoff)).exists()
|
||||
|
||||
|
||||
def log_ingestion_error(*, run: IngestionRun, message: str, provider_namespace: str, severity: str = IngestionError.Severity.ERROR, entity_type: str = "", external_id: str = "", raw_payload: dict | None = None) -> IngestionError:
|
||||
return IngestionError.objects.create(
|
||||
ingestion_run=run,
|
||||
|
||||
@ -427,6 +427,12 @@ def run_sync_job(
|
||||
context=context or {},
|
||||
)
|
||||
summary = SyncSummary()
|
||||
logger.info(
|
||||
"Starting ingestion run id=%s provider=%s job_type=%s",
|
||||
run.id,
|
||||
provider_namespace,
|
||||
job_type,
|
||||
)
|
||||
|
||||
try:
|
||||
provider = get_provider(provider_namespace)
|
||||
@ -444,6 +450,9 @@ def run_sync_job(
|
||||
_sync_player_stats(provider_namespace, payload.get("player_stats", []), run, summary)
|
||||
_sync_player_careers(provider_namespace, payload.get("player_careers", []), run, summary)
|
||||
|
||||
success_error_summary = ""
|
||||
if summary.failed > 0:
|
||||
success_error_summary = f"Completed with {summary.failed} failed record(s)."
|
||||
finish_ingestion_run(
|
||||
run=run,
|
||||
status=IngestionRun.RunStatus.SUCCESS,
|
||||
@ -451,6 +460,16 @@ def run_sync_job(
|
||||
created=summary.created,
|
||||
updated=summary.updated,
|
||||
failed=summary.failed,
|
||||
error_summary=success_error_summary,
|
||||
)
|
||||
logger.info(
|
||||
"Completed ingestion run id=%s status=%s processed=%s created=%s updated=%s failed=%s",
|
||||
run.id,
|
||||
IngestionRun.RunStatus.SUCCESS,
|
||||
summary.processed,
|
||||
summary.created,
|
||||
summary.updated,
|
||||
summary.failed,
|
||||
)
|
||||
return run
|
||||
|
||||
@ -471,6 +490,7 @@ def run_sync_job(
|
||||
created=summary.created,
|
||||
updated=summary.updated,
|
||||
failed=summary.failed + 1,
|
||||
error_summary=f"Rate limit from provider: {exc}",
|
||||
)
|
||||
raise
|
||||
|
||||
@ -490,6 +510,7 @@ def run_sync_job(
|
||||
created=summary.created,
|
||||
updated=summary.updated,
|
||||
failed=summary.failed + 1,
|
||||
error_summary=f"Transient provider error: {exc}",
|
||||
)
|
||||
raise
|
||||
|
||||
@ -509,5 +530,6 @@ def run_sync_job(
|
||||
created=summary.created,
|
||||
updated=summary.updated,
|
||||
failed=summary.failed + 1,
|
||||
error_summary=f"Unhandled ingestion error: {exc}",
|
||||
)
|
||||
raise
|
||||
|
||||
@ -1,8 +1,51 @@
|
||||
import logging
|
||||
|
||||
from celery import shared_task
|
||||
from django.conf import settings
|
||||
|
||||
from apps.ingestion.models import IngestionRun
|
||||
from apps.ingestion.services.runs import has_running_ingestion_run, mark_ingestion_run_skipped
|
||||
from apps.ingestion.services.sync import run_sync_job
|
||||
from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError
|
||||
from apps.providers.registry import get_default_provider_namespace
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _run_sync_with_overlap_guard(
|
||||
*,
|
||||
provider_namespace: str,
|
||||
job_type: str,
|
||||
triggered_by_id: int | None = None,
|
||||
context: dict | None = None,
|
||||
cursor: str | None = None,
|
||||
):
|
||||
effective_context = context or {}
|
||||
if settings.INGESTION_PREVENT_OVERLAP and has_running_ingestion_run(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=job_type,
|
||||
within_minutes=settings.INGESTION_OVERLAP_WINDOW_MINUTES,
|
||||
):
|
||||
reason = (
|
||||
f"Skipped due to overlapping running job for provider={provider_namespace}, "
|
||||
f"job_type={job_type}."
|
||||
)
|
||||
logger.warning(reason)
|
||||
run = mark_ingestion_run_skipped(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=job_type,
|
||||
reason=reason,
|
||||
context=effective_context,
|
||||
)
|
||||
return run.id
|
||||
|
||||
return run_sync_job(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=job_type,
|
||||
triggered_by_id=triggered_by_id,
|
||||
context=effective_context,
|
||||
cursor=cursor,
|
||||
).id
|
||||
|
||||
|
||||
@shared_task(
|
||||
@ -13,12 +56,12 @@ from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientE
|
||||
retry_kwargs={"max_retries": 5},
|
||||
)
|
||||
def trigger_full_sync(self, provider_namespace: str, triggered_by_id: int | None = None, context: dict | None = None):
|
||||
return run_sync_job(
|
||||
return _run_sync_with_overlap_guard(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=IngestionRun.JobType.FULL_SYNC,
|
||||
triggered_by_id=triggered_by_id,
|
||||
context=context or {},
|
||||
).id
|
||||
)
|
||||
|
||||
|
||||
@shared_task(
|
||||
@ -29,10 +72,34 @@ def trigger_full_sync(self, provider_namespace: str, triggered_by_id: int | None
|
||||
retry_kwargs={"max_retries": 5},
|
||||
)
|
||||
def trigger_incremental_sync(self, provider_namespace: str, cursor: str | None = None, triggered_by_id: int | None = None, context: dict | None = None):
|
||||
return run_sync_job(
|
||||
return _run_sync_with_overlap_guard(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=IngestionRun.JobType.INCREMENTAL,
|
||||
triggered_by_id=triggered_by_id,
|
||||
context=context or {},
|
||||
cursor=cursor,
|
||||
).id
|
||||
)
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
name="apps.ingestion.tasks.scheduled_provider_sync",
|
||||
autoretry_for=(ProviderRateLimitError, ProviderTransientError),
|
||||
retry_backoff=True,
|
||||
retry_jitter=True,
|
||||
retry_kwargs={"max_retries": 5},
|
||||
)
|
||||
def scheduled_provider_sync(self):
|
||||
provider_namespace = settings.INGESTION_SCHEDULE_PROVIDER_NAMESPACE or get_default_provider_namespace()
|
||||
context = {"trigger": "celery_beat", "task_id": self.request.id}
|
||||
if settings.INGESTION_SCHEDULE_JOB_TYPE == IngestionRun.JobType.FULL_SYNC:
|
||||
return _run_sync_with_overlap_guard(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=IngestionRun.JobType.FULL_SYNC,
|
||||
context=context,
|
||||
)
|
||||
return _run_sync_with_overlap_guard(
|
||||
provider_namespace=provider_namespace,
|
||||
job_type=IngestionRun.JobType.INCREMENTAL,
|
||||
context=context,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user