|
| 1 | +"""Background scheduler setup for automatic staging runs.""" |
| 2 | + |
| 3 | +import logging |
| 4 | +from datetime import datetime, timedelta |
| 5 | + |
| 6 | +from fastapi_crons import Crons |
| 7 | + |
| 8 | +from database import SessionLocal |
| 9 | +from db_constraints import ( |
| 10 | + validate_single_running_staging, |
| 11 | + enforce_single_running_staging, |
| 12 | +) |
| 13 | +from discord_webhook import discord_webhook |
| 14 | +from models import ( |
| 15 | + InitiatedVia, |
| 16 | + StagingRun, |
| 17 | + StagingRunStatus, |
| 18 | +) |
| 19 | +from scheduler_config import ( |
| 20 | + SCHEDULER_AUTO_KERNEL_TREE, |
| 21 | + SCHEDULER_SKIP_WINDOW_SECONDS, |
| 22 | + SCHEDULER_USERNAME, |
| 23 | +) |
| 24 | +from scheduler_user import ensure_scheduler_user |
| 25 | + |
| 26 | +logger = logging.getLogger(__name__) |
| 27 | + |
| 28 | +CRON_EXPRESSION = "0 0,8,16 * * *" # 00:00, 08:00, 16:00 UTC |
| 29 | + |
| 30 | + |
| 31 | +async def _run_scheduled_staging() -> None: |
| 32 | + """Execute a staging run if cooldown and concurrency constraints allow it.""" |
| 33 | + db = SessionLocal() |
| 34 | + try: |
| 35 | + scheduler_user = ensure_scheduler_user(db) |
| 36 | + |
| 37 | + now = datetime.utcnow() |
| 38 | + cooldown_start = now - timedelta(seconds=SCHEDULER_SKIP_WINDOW_SECONDS) |
| 39 | + |
| 40 | + # Skip if another user triggered a run recently |
| 41 | + recent_manual_run = ( |
| 42 | + db.query(StagingRun) |
| 43 | + .filter(StagingRun.start_time >= cooldown_start) |
| 44 | + .filter(StagingRun.user_id != scheduler_user.id) |
| 45 | + .order_by(StagingRun.start_time.desc()) |
| 46 | + .first() |
| 47 | + ) |
| 48 | + if recent_manual_run: |
| 49 | + logger.info( |
| 50 | + "Skipping scheduled staging: recent run #%s by %s at %s", |
| 51 | + recent_manual_run.id, |
| 52 | + recent_manual_run.user.username, |
| 53 | + recent_manual_run.start_time, |
| 54 | + ) |
| 55 | + return |
| 56 | + |
| 57 | + # Skip if a run is in progress |
| 58 | + running_staging = validate_single_running_staging(db) |
| 59 | + if running_staging: |
| 60 | + logger.info( |
| 61 | + "Skipping scheduled staging: run #%s is currently %s", |
| 62 | + running_staging.id, |
| 63 | + running_staging.status.value, |
| 64 | + ) |
| 65 | + return |
| 66 | + |
| 67 | + staging_run = StagingRun( |
| 68 | + user_id=scheduler_user.id, |
| 69 | + status=StagingRunStatus.RUNNING, |
| 70 | + initiated_via=InitiatedVia.CRON, |
| 71 | + kernel_tree=SCHEDULER_AUTO_KERNEL_TREE, |
| 72 | + ) |
| 73 | + db.add(staging_run) |
| 74 | + db.flush() |
| 75 | + |
| 76 | + if not enforce_single_running_staging(db, staging_run.id): |
| 77 | + logger.warning( |
| 78 | + "Scheduled staging run #%s cancelled due to concurrency enforcement", |
| 79 | + staging_run.id, |
| 80 | + ) |
| 81 | + db.rollback() |
| 82 | + return |
| 83 | + |
| 84 | + db.commit() |
| 85 | + db.refresh(staging_run) |
| 86 | + logger.info("Scheduled staging run #%s started", staging_run.id) |
| 87 | + |
| 88 | + logger.info("Using virtual scheduler user '%s'", SCHEDULER_USERNAME) |
| 89 | + |
| 90 | + if discord_webhook: |
| 91 | + try: |
| 92 | + await discord_webhook.send_staging_start( |
| 93 | + SCHEDULER_USERNAME, staging_run.id |
| 94 | + ) |
| 95 | + except Exception as exc: |
| 96 | + logger.warning( |
| 97 | + "Discord notification failed for scheduler run #%s: %s", |
| 98 | + staging_run.id, |
| 99 | + exc, |
| 100 | + ) |
| 101 | + |
| 102 | + except Exception as exc: |
| 103 | + logger.error("Scheduler job failed: %s", exc) |
| 104 | + db.rollback() |
| 105 | + finally: |
| 106 | + db.close() |
| 107 | + |
| 108 | + |
| 109 | +def register_cron_jobs(crons: Crons) -> None: |
| 110 | + """Attach scheduled staging jobs to the provided cron scheduler.""" |
| 111 | + |
| 112 | + @crons.cron(CRON_EXPRESSION, name="staging_scheduler") |
| 113 | + async def scheduled_staging_job(): |
| 114 | + await _run_scheduled_staging() |
| 115 | + |
| 116 | + logger.info("Registered scheduled staging job for expression '%s'", CRON_EXPRESSION) |
0 commit comments