import logging from celery import shared_task from django.conf import settings from apps.ingestion.models import IngestionRun from apps.ingestion.services.runs import has_running_ingestion_run, mark_ingestion_run_skipped from apps.ingestion.services.sync import run_sync_job from apps.providers.exceptions import ProviderRateLimitError, ProviderTransientError from apps.providers.registry import get_default_provider_namespace logger = logging.getLogger(__name__) def _run_sync_with_overlap_guard( *, provider_namespace: str, job_type: str, triggered_by_id: int | None = None, context: dict | None = None, cursor: str | None = None, ): effective_context = context or {} if settings.INGESTION_PREVENT_OVERLAP and has_running_ingestion_run( provider_namespace=provider_namespace, job_type=job_type, within_minutes=settings.INGESTION_OVERLAP_WINDOW_MINUTES, ): reason = ( f"Skipped due to overlapping running job for provider={provider_namespace}, " f"job_type={job_type}." ) logger.warning(reason) run = mark_ingestion_run_skipped( provider_namespace=provider_namespace, job_type=job_type, reason=reason, context=effective_context, ) return run.id return run_sync_job( provider_namespace=provider_namespace, job_type=job_type, triggered_by_id=triggered_by_id, context=effective_context, cursor=cursor, ).id @shared_task( bind=True, autoretry_for=(ProviderRateLimitError, ProviderTransientError), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 5}, ) def trigger_full_sync(self, provider_namespace: str, triggered_by_id: int | None = None, context: dict | None = None): return _run_sync_with_overlap_guard( provider_namespace=provider_namespace, job_type=IngestionRun.JobType.FULL_SYNC, triggered_by_id=triggered_by_id, context=context or {}, ) @shared_task( bind=True, autoretry_for=(ProviderRateLimitError, ProviderTransientError), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 5}, ) def trigger_incremental_sync(self, provider_namespace: str, cursor: str | None = None, triggered_by_id: int | None = None, context: dict | None = None): return _run_sync_with_overlap_guard( provider_namespace=provider_namespace, job_type=IngestionRun.JobType.INCREMENTAL, triggered_by_id=triggered_by_id, context=context or {}, cursor=cursor, ) @shared_task( bind=True, name="apps.ingestion.tasks.scheduled_provider_sync", autoretry_for=(ProviderRateLimitError, ProviderTransientError), retry_backoff=True, retry_jitter=True, retry_kwargs={"max_retries": 5}, ) def scheduled_provider_sync(self): provider_namespace = settings.INGESTION_SCHEDULE_PROVIDER_NAMESPACE or get_default_provider_namespace() context = {"trigger": "celery_beat", "task_id": self.request.id} if settings.INGESTION_SCHEDULE_JOB_TYPE == IngestionRun.JobType.FULL_SYNC: return _run_sync_with_overlap_guard( provider_namespace=provider_namespace, job_type=IngestionRun.JobType.FULL_SYNC, context=context, ) return _run_sync_with_overlap_guard( provider_namespace=provider_namespace, job_type=IngestionRun.JobType.INCREMENTAL, context=context, )