import os import subprocess import sys import pytest def _run_python_import(code: str, env_overrides: dict[str, str]) -> subprocess.CompletedProcess: env = os.environ.copy() env.update(env_overrides) return subprocess.run( [sys.executable, "-c", code], capture_output=True, text=True, env=env, check=False, ) @pytest.mark.django_db def test_invalid_cron_does_not_crash_config_import_path(): result = _run_python_import( ( "import config; " "from config.celery import app; " "print(f'beat_schedule_size={len(app.conf.beat_schedule or {})}')" ), { "DJANGO_SETTINGS_MODULE": "config.settings.development", "DJANGO_ENV": "development", "DJANGO_DEBUG": "1", "INGESTION_SCHEDULE_ENABLED": "1", "INGESTION_SCHEDULE_CRON": "bad cron value", }, ) assert result.returncode == 0 assert "beat_schedule_size=0" in result.stdout