Files
hoopscout/apps/ingestion/tasks.py
2026-03-10 13:44:36 +01:00

106 lines
3.4 KiB
Python

import logging
from celery import shared_task
from django.conf import settings
from apps.ingestion.models import IngestionRun
from apps.ingestion.services.runs import has_running_ingestion_run, mark_ingestion_run_skipped
from apps.ingestion.services.sync import run_sync_job
from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError
from apps.providers.registry import get_default_provider_namespace
logger = logging.getLogger(__name__)
def _run_sync_with_overlap_guard(
*,
provider_namespace: str,
job_type: str,
triggered_by_id: int | None = None,
context: dict | None = None,
cursor: str | None = None,
):
effective_context = context or {}
if settings.INGESTION_PREVENT_OVERLAP and has_running_ingestion_run(
provider_namespace=provider_namespace,
job_type=job_type,
within_minutes=settings.INGESTION_OVERLAP_WINDOW_MINUTES,
):
reason = (
f"Skipped due to overlapping running job for provider={provider_namespace}, "
f"job_type={job_type}."
)
logger.warning(reason)
run = mark_ingestion_run_skipped(
provider_namespace=provider_namespace,
job_type=job_type,
reason=reason,
context=effective_context,
)
return run.id
return run_sync_job(
provider_namespace=provider_namespace,
job_type=job_type,
triggered_by_id=triggered_by_id,
context=effective_context,
cursor=cursor,
).id
@shared_task(
bind=True,
autoretry_for=(ProviderRateLimitError, ProviderTransientError),
retry_backoff=True,
retry_jitter=True,
retry_kwargs={"max_retries": 5},
)
def trigger_full_sync(self, provider_namespace: str, triggered_by_id: int | None = None, context: dict | None = None):
return _run_sync_with_overlap_guard(
provider_namespace=provider_namespace,
job_type=IngestionRun.JobType.FULL_SYNC,
triggered_by_id=triggered_by_id,
context=context or {},
)
@shared_task(
bind=True,
autoretry_for=(ProviderRateLimitError, ProviderTransientError),
retry_backoff=True,
retry_jitter=True,
retry_kwargs={"max_retries": 5},
)
def trigger_incremental_sync(self, provider_namespace: str, cursor: str | None = None, triggered_by_id: int | None = None, context: dict | None = None):
return _run_sync_with_overlap_guard(
provider_namespace=provider_namespace,
job_type=IngestionRun.JobType.INCREMENTAL,
triggered_by_id=triggered_by_id,
context=context or {},
cursor=cursor,
)
@shared_task(
bind=True,
name="apps.ingestion.tasks.scheduled_provider_sync",
autoretry_for=(ProviderRateLimitError, ProviderTransientError),
retry_backoff=True,
retry_jitter=True,
retry_kwargs={"max_retries": 5},
)
def scheduled_provider_sync(self):
provider_namespace = settings.INGESTION_SCHEDULE_PROVIDER_NAMESPACE or get_default_provider_namespace()
context = {"trigger": "celery_beat", "task_id": self.request.id}
if settings.INGESTION_SCHEDULE_JOB_TYPE == IngestionRun.JobType.FULL_SYNC:
return _run_sync_with_overlap_guard(
provider_namespace=provider_namespace,
job_type=IngestionRun.JobType.FULL_SYNC,
context=context,
)
return _run_sync_with_overlap_guard(
provider_namespace=provider_namespace,
job_type=IngestionRun.JobType.INCREMENTAL,
context=context,
)