from __future__ import annotations import os import subprocess import time from pathlib import Path def _repo_root() -> Path: return Path(__file__).resolve().parent.parent def test_scheduler_disabled_mode_stays_alive_without_exit_loop(): env = os.environ.copy() env["SCHEDULER_ENABLED"] = "0" env["SCHEDULER_DISABLED_SLEEP_SECONDS"] = "30" process = subprocess.Popen( ["sh", "scripts/scheduler.sh"], cwd=_repo_root(), env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) try: time.sleep(1.0) assert process.poll() is None finally: process.terminate() process.wait(timeout=5) def test_scheduler_compose_service_is_profile_gated(): compose_text = (_repo_root() / "docker-compose.yml").read_text(encoding="utf-8") assert 'profiles: ["scheduler"]' in compose_text assert "restart: unless-stopped" in compose_text def test_scheduler_script_declares_idle_disabled_behavior(): scheduler_script = (_repo_root() / "scripts/scheduler.sh").read_text(encoding="utf-8") assert "Entering idle mode" in scheduler_script assert "SCHEDULER_DISABLED_SLEEP_SECONDS" in scheduler_script