import os from celery import Celery from celery.schedules import crontab from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.development") app = Celery("hoopscout") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() def _parse_cron_expression(expression: str) -> dict[str, str]: parts = expression.split() if len(parts) != 5: raise ValueError( "INGESTION_SCHEDULE_CRON must have 5 fields: minute hour day_of_month month_of_year day_of_week." ) return { "minute": parts[0], "hour": parts[1], "day_of_month": parts[2], "month_of_year": parts[3], "day_of_week": parts[4], } def build_periodic_schedule() -> dict: if not settings.INGESTION_SCHEDULE_ENABLED: return {} schedule_kwargs = _parse_cron_expression(settings.INGESTION_SCHEDULE_CRON) return { "ingestion.scheduled_provider_sync": { "task": "apps.ingestion.tasks.scheduled_provider_sync", "schedule": crontab(**schedule_kwargs), } } app.conf.beat_schedule = build_periodic_schedule()