From 805469da50a2174c2c9d7f56c16d40ad341b728e Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 30 Oct 2025 17:36:02 -0700 Subject: [PATCH 1/6] feat: add periodic status check for incomplete jobs (again) --- ami/jobs/admin.py | 44 ++- .../migrations/0019_job_last_checked_at.py | 19 ++ ...ate_check_unfinished_jobs_periodic_task.py | 47 +++ ami/jobs/models.py | 28 ++ ami/jobs/status_checker.py | 182 +++++++++++ ami/jobs/tasks.py | 93 ++++++ ami/jobs/tests.py | 292 ++++++++++++++++++ 7 files changed, 690 insertions(+), 15 deletions(-) create mode 100644 ami/jobs/migrations/0019_job_last_checked_at.py create mode 100644 ami/jobs/migrations/0020_create_check_unfinished_jobs_periodic_task.py create mode 100644 ami/jobs/status_checker.py diff --git a/ami/jobs/admin.py b/ami/jobs/admin.py index b5c921502..b72c8dae5 100644 --- a/ami/jobs/admin.py +++ b/ami/jobs/admin.py @@ -4,7 +4,7 @@ from ami.main.admin import AdminBase -from .models import Job, get_job_type_by_inferred_key +from .models import Job @admin.register(Job) @@ -13,14 +13,14 @@ class JobAdmin(AdminBase): list_display = ( "name", + "job_type_key", "status", "task_id", + "project", "scheduled_at", "started_at", "finished_at", "duration", - "job_type_key", - "inferred_job_type", ) @admin.action() @@ -29,22 +29,28 @@ def enqueue_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None: job.enqueue() self.message_user(request, f"Queued {queryset.count()} job(s).") - @admin.display(description="Inferred Job Type") - def inferred_job_type(self, obj: Job) -> str: - """ - @TODO Remove this after running migration 0011_job_job_type_key.py and troubleshooting. - """ - job_type = get_job_type_by_inferred_key(obj) - return job_type.name if job_type else "Could not infer" + @admin.action() + def retry_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None: + for job in queryset: + job.retry(async_task=True) + self.message_user(request, f"Retried {queryset.count()} job(s).") - # return obj.job_type().name + @admin.action() + def cancel_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None: + for job in queryset: + job.cancel() + self.message_user(request, f"Cancelled {queryset.count()} job(s).") - actions = [enqueue_jobs] + actions = [enqueue_jobs, retry_jobs] - exclude = ( - # This takes too long to load in the admin panel + autocomplete_fields = ( + "source_image_collection", "source_image_single", - # These are read-only fields + "pipeline", + "project", + ) + + readonly_fields = ( "task_id", "scheduled_at", "started_at", @@ -54,3 +60,11 @@ def inferred_job_type(self, obj: Job) -> str: "progress", "result", ) + + list_filter = ( + "status", + "job_type_key", + "project", + "source_image_collection", + "pipeline", + ) diff --git a/ami/jobs/migrations/0019_job_last_checked_at.py b/ami/jobs/migrations/0019_job_last_checked_at.py new file mode 100644 index 000000000..0d990bb39 --- /dev/null +++ b/ami/jobs/migrations/0019_job_last_checked_at.py @@ -0,0 +1,19 @@ +# Generated by Django 4.2.10 on 2025-10-30 19:54 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0018_alter_job_job_type_key"), + ] + + operations = [ + migrations.AddField( + model_name="job", + name="last_checked_at", + field=models.DateTimeField( + blank=True, help_text="Last time job status was checked by periodic task", null=True + ), + ), + ] diff --git a/ami/jobs/migrations/0020_create_check_unfinished_jobs_periodic_task.py b/ami/jobs/migrations/0020_create_check_unfinished_jobs_periodic_task.py new file mode 100644 index 000000000..0b8a34602 --- /dev/null +++ b/ami/jobs/migrations/0020_create_check_unfinished_jobs_periodic_task.py @@ -0,0 +1,47 @@ +# Generated manually for periodic job status checking + +from django.db import migrations + + +def create_periodic_task(apps, schema_editor): + """Create periodic task to check unfinished jobs every 3 minutes.""" + try: + from django_celery_beat.models import IntervalSchedule, PeriodicTask + + interval_schedule, _ = IntervalSchedule.objects.get_or_create( + every=3, + period=IntervalSchedule.MINUTES, + ) + + PeriodicTask.objects.get_or_create( + name="jobs.check_unfinished_jobs", + defaults={ + "task": "ami.jobs.tasks.check_unfinished_jobs", + "interval": interval_schedule, + "enabled": True, + "description": "Check status of unfinished jobs and update if tasks disappeared", + }, + ) + except Exception as e: + print(f"Warning: Could not create periodic task: {e}") + print("You may need to create it manually in the Django admin or via shell.") + + +def delete_periodic_task(apps, schema_editor): + """Delete the periodic task if rolling back.""" + try: + from django_celery_beat.models import PeriodicTask + + PeriodicTask.objects.filter(name="jobs.check_unfinished_jobs").delete() + except Exception as e: + print(f"Warning: Could not delete periodic task: {e}") + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0019_job_last_checked_at"), + ] + + operations = [ + migrations.RunPython(create_periodic_task, delete_periodic_task), + ] diff --git a/ami/jobs/models.py b/ami/jobs/models.py index ac0078d76..b1c0dce5e 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -722,11 +722,22 @@ class Job(BaseModel): # Hide old failed jobs after 3 days FAILED_CUTOFF_HOURS = 24 * 3 + # Job status checking configuration constants + NO_TASK_ID_TIMEOUT_SECONDS = 300 # 5 minutes + DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes + MAX_JOB_RUNTIME_SECONDS = 7 * 24 * 60 * 60 # 7 days + STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes + name = models.CharField(max_length=255) queue = models.CharField(max_length=255, default="default") scheduled_at = models.DateTimeField(null=True, blank=True) started_at = models.DateTimeField(null=True, blank=True) finished_at = models.DateTimeField(null=True, blank=True) + last_checked_at = models.DateTimeField( + null=True, + blank=True, + help_text="Last time job status was checked by periodic task", + ) # @TODO can we use an Enum or Pydantic model for status? status = models.CharField(max_length=255, default=JobState.CREATED.name, choices=JobState.choices()) progress: JobProgress = SchemaField(JobProgress, default=default_job_progress) @@ -949,6 +960,23 @@ def save(self, update_progress=True, *args, **kwargs): if self.progress.summary.status != self.status: logger.warning(f"Job {self} status mismatches progress: {self.progress.summary.status} != {self.status}") + def check_status(self, force: bool = False, save: bool = True) -> bool: + """ + Check if the job's Celery task still exists and update status accordingly. + + This delegates to the status_checker module to avoid bloating this file. + + Args: + force: Skip the recent check time limit + save: Save the job if status changes + + Returns: + bool: True if job status was changed, False otherwise + """ + from ami.jobs.status_checker import check_job_status + + return check_job_status(self, force=force, save=save) + def check_custom_permission(self, user, action: str) -> bool: job_type = self.job_type_key.lower() if self.source_image_single: diff --git a/ami/jobs/status_checker.py b/ami/jobs/status_checker.py new file mode 100644 index 000000000..a43f7e2a9 --- /dev/null +++ b/ami/jobs/status_checker.py @@ -0,0 +1,182 @@ +""" +Job status checking utilities for monitoring and updating job states. + +This module provides functions to check if Celery tasks still exist and update +job statuses accordingly when tasks disappear, get stuck, or exceed timeouts. +""" +import datetime +import functools +import logging +import time + +from celery.result import AsyncResult +from django.utils import timezone + +from config import celery_app + +logger = logging.getLogger(__name__) + + +def check_celery_workers_available() -> tuple[bool, int]: + """ + Check if any Celery workers are currently running. + + Returns: + tuple: (workers_available: bool, worker_count: int) + """ + try: + inspect = celery_app.control.inspect() + active_workers = inspect.active() + + if active_workers is None: + return False, 0 + + worker_count = len(active_workers) + return worker_count > 0, worker_count + except Exception as e: + logger.warning(f"Failed to check for Celery workers: {e}") + return True, 0 # Fail open - assume workers might be available + + +@functools.lru_cache(maxsize=1) +def check_celery_workers_available_cached(timestamp: int) -> tuple[bool, int]: + """ + Cached version of worker availability check. + + Cache key is current minute, so results are cached for ~1 minute. + + Args: + timestamp: Current minute as int(time.time() / 60) + + Returns: + tuple: (workers_available: bool, worker_count: int) + """ + return check_celery_workers_available() + + +def check_job_status(job, force: bool = False, save: bool = True) -> bool: + """ + Check if the job's Celery task still exists and update status accordingly. + + This function handles multiple scenarios: + 1. Missing task_id - Job scheduled but never got a task ID + 2. Disappeared task - Task existed but is now gone from Celery + 3. Stuck pending - Job waiting too long with no workers + 4. Max runtime exceeded - Job running longer than allowed + 5. Resurrection - Job marked failed but task is actually running + + Args: + job: The Job instance to check + force: Skip the recent check time limit + save: Save the job if status changes + + Returns: + bool: True if job status was changed, False otherwise + """ + from ami.jobs.models import JobState + + now = timezone.now() + status_changed = False + + # Skip if checked recently (unless forced) + if not force and job.last_checked_at: + time_since_check = now - job.last_checked_at + if time_since_check < datetime.timedelta(minutes=2): + # Still update last_checked_at even though we're skipping + if save: + job.last_checked_at = now + job.save(update_fields=["last_checked_at"], update_progress=False) + return False + + # Update last_checked_at for this check + if save: + job.last_checked_at = now + + # Skip if job is already in a final state + if job.status in JobState.final_states(): + if save: + job.save(update_fields=["last_checked_at"], update_progress=False) + return False + + # Scenario 4: Max Runtime Exceeded (check this first - doesn't require Celery query) + if job.status == JobState.STARTED and job.started_at: + runtime = (now - job.started_at).total_seconds() + + if runtime > job.MAX_JOB_RUNTIME_SECONDS: + job.logger.error( + f"Job exceeded maximum runtime of {job.MAX_JOB_RUNTIME_SECONDS}s " + f"(running for {runtime:.0f}s). Marking as FAILURE." + ) + job.status = JobState.FAILURE + status_changed = True + if save: + job.save(update_progress=False) + return status_changed + + # Scenario 1: Missing Task ID + if not job.task_id: + if job.status == JobState.PENDING and job.scheduled_at: + time_waiting = (now - job.scheduled_at).total_seconds() + if time_waiting > job.NO_TASK_ID_TIMEOUT_SECONDS: + job.logger.error( + f"Job scheduled {time_waiting:.0f}s ago but never got a task_id. " f"Marking as FAILURE." + ) + job.status = JobState.FAILURE + status_changed = True + if save: + job.save(update_progress=False) + return status_changed + + # Scenario 2-5: Has task_id + if job.task_id: + try: + task = AsyncResult(job.task_id) + celery_status = task.status + + # Scenario 2: Disappeared Task + # If Celery says PENDING but job was STARTED, task disappeared + if celery_status == "PENDING" and job.status == JobState.STARTED: + if job.started_at: + time_since_start = (now - job.started_at).total_seconds() + if time_since_start > job.DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS: + job.logger.error( + f"Task {job.task_id} disappeared from Celery " + f"(started {time_since_start:.0f}s ago). Marking as FAILURE." + ) + job.status = JobState.FAILURE + status_changed = True + + # Scenario 5: Resurrection + # Job marked as FAILURE but task is actually running or succeeded + elif job.status == JobState.FAILURE and celery_status in ["STARTED", "SUCCESS"]: + job.logger.warning( + f"Job was marked FAILURE but task is {celery_status}. " f"Resurrecting job to match task state." + ) + job.update_status(celery_status, save=False) + status_changed = True + + except Exception as e: + job.logger.warning(f"Could not check task {job.task_id} status: {e}") + + # Scenario 3: Stuck Pending (No Workers) + if job.status == JobState.PENDING and job.scheduled_at: + time_pending = (now - job.scheduled_at).total_seconds() + + if time_pending > job.STUCK_PENDING_TIMEOUT_SECONDS: + workers_available, worker_count = check_celery_workers_available_cached( + int(time.time() / 60) # Cache for 1 minute + ) + + if not workers_available: + job.logger.warning( + f"Job stuck in PENDING for {time_pending:.0f}s " + f"but no workers available (worker_count={worker_count}). " + f"Not marking as failed yet - may just be queued." + ) + # Don't mark as failed - might just be queued behind other jobs + + # Save if status changed or if we need to update last_checked_at + if save: + job.save(update_progress=False) + + return status_changed diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index b12271178..2c674e0bb 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -1,12 +1,17 @@ +import datetime import logging from celery.result import AsyncResult from celery.signals import task_failure, task_postrun, task_prerun +from celery.utils.log import get_task_logger +from django.core.cache import cache +from django.utils import timezone from ami.tasks import default_soft_time_limit, default_time_limit from config import celery_app logger = logging.getLogger(__name__) +task_logger = get_task_logger(__name__) @celery_app.task(bind=True, soft_time_limit=default_soft_time_limit, time_limit=default_time_limit) @@ -63,3 +68,91 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}') job.save() + + +@celery_app.task(soft_time_limit=300, time_limit=360) +def check_unfinished_jobs(): + """ + Periodic task to check status of all unfinished jobs. + + This task identifies jobs stuck in non-final states and verifies their + Celery tasks still exist. If tasks have disappeared or jobs have exceeded + timeouts, their status is updated accordingly. + + Uses cache-based locking to prevent duplicate execution. + + Returns: + dict: Summary with counts of jobs checked, updated, and any errors + """ + from ami.jobs.models import Job, JobState + + # Configuration + LOCK_TIMEOUT_SECONDS = 300 # 5 minutes + MAX_JOBS_PER_RUN = 100 + MIN_CHECK_INTERVAL_MINUTES = 2 + + # Use cache-based locking to prevent duplicates + lock_id = "check_unfinished_jobs_lock" + + if not cache.add(lock_id, "locked", LOCK_TIMEOUT_SECONDS): + task_logger.info("check_unfinished_jobs already running, skipping") + return {"status": "skipped", "reason": "already_running"} + + try: + task_logger.info("Starting check_unfinished_jobs task") + + # Get all unfinished jobs + unfinished_jobs = Job.objects.filter(status__in=JobState.running_states()).order_by("scheduled_at") + + total_jobs = unfinished_jobs.count() + task_logger.info(f"Found {total_jobs} unfinished jobs") + + if total_jobs == 0: + return {"status": "success", "checked": 0, "updated": 0} + + # Limit to avoid overwhelming the system + if total_jobs > MAX_JOBS_PER_RUN: + task_logger.warning(f"Limiting to {MAX_JOBS_PER_RUN} jobs") + unfinished_jobs = unfinished_jobs[:MAX_JOBS_PER_RUN] + + # Filter to jobs not checked recently + now = timezone.now() + min_check_interval = datetime.timedelta(minutes=MIN_CHECK_INTERVAL_MINUTES) + + jobs_to_check = [] + for job in unfinished_jobs: + if job.last_checked_at is None: + jobs_to_check.append(job) + elif (now - job.last_checked_at) >= min_check_interval: + jobs_to_check.append(job) + + task_logger.info(f"Checking {len(jobs_to_check)} jobs needing verification") + + checked_count = 0 + updated_count = 0 + error_count = 0 + + for job in jobs_to_check: + try: + status_changed = job.check_status(force=False, save=True) + checked_count += 1 + if status_changed: + updated_count += 1 + task_logger.info(f"Updated job {job.pk} status to {job.status}") + except Exception as e: + error_count += 1 + task_logger.error(f"Error checking job {job.pk}: {e}", exc_info=True) + + result = { + "status": "success", + "total_unfinished": total_jobs, + "checked": checked_count, + "updated": updated_count, + "errors": error_count, + } + task_logger.info(f"Completed check_unfinished_jobs: {result}") + return result + + finally: + # Always release the lock + cache.delete(lock_id) diff --git a/ami/jobs/tests.py b/ami/jobs/tests.py index 64fbf23a2..b0d6f9baa 100644 --- a/ami/jobs/tests.py +++ b/ami/jobs/tests.py @@ -198,3 +198,295 @@ def test_cancel_job(self): # This cannot be tested until we have a way to cancel jobs # and a way to run async tasks in tests. pass + + +class TestJobStatusChecking(TestCase): + """ + Test the job status checking functionality. + """ + + def setUp(self): + self.project = Project.objects.create(name="Status Check Test Project") + self.pipeline = Pipeline.objects.create( + name="Test ML pipeline", + description="Test ML pipeline", + ) + self.pipeline.projects.add(self.project) + + def test_check_status_missing_task_id(self): + """Test that jobs scheduled but never assigned a task_id are marked failed.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - No Task ID", + status=JobState.PENDING, + scheduled_at=timezone.now() - timedelta(minutes=10), + task_id=None, + ) + + status_changed = job.check_status(force=True, save=True) + + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE) + self.assertIsNotNone(job.last_checked_at) + # Check that error was logged + self.assertTrue(any("never got a task_id" in msg for msg in job.logs.stderr)) + + def test_check_status_not_missing_task_id_when_recent(self): + """Test that jobs without task_id but scheduled recently are not marked failed.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Recent No Task ID", + status=JobState.PENDING, + scheduled_at=timezone.now() - timedelta(minutes=1), # Only 1 minute old + task_id=None, + ) + + status_changed = job.check_status(force=True, save=True) + + self.assertFalse(status_changed) + self.assertEqual(job.status, JobState.PENDING) + + def test_check_status_disappeared_task(self): + """Test that started jobs with disappeared tasks are marked failed.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Disappeared Task", + status=JobState.STARTED, + started_at=timezone.now() - timedelta(minutes=10), + task_id="nonexistent-task-id-12345", + ) + + status_changed = job.check_status(force=True, save=True) + + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE) + # Check that error was logged + self.assertTrue(any("disappeared from Celery" in msg for msg in job.logs.stderr)) + + def test_check_status_max_runtime_exceeded(self): + """Test that jobs running too long are marked failed.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Max Runtime", + status=JobState.STARTED, + started_at=timezone.now() - timedelta(days=8), # 8 days (max is 7) + task_id="some-task-id", + ) + + status_changed = job.check_status(force=True, save=True) + + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE) + # Refresh from DB to get the latest logs + job.refresh_from_db() + # Check that error was logged (check both stdout and stderr) + all_logs = job.logs.stdout + job.logs.stderr + self.assertTrue( + any("exceeded maximum runtime" in msg for msg in all_logs), + f"Expected 'exceeded maximum runtime' in logs, got: {all_logs}", + ) + + def test_check_status_skip_final_states(self): + """Test that jobs in final states are not re-checked.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Already Success", + status=JobState.SUCCESS, + started_at=timezone.now() - timedelta(minutes=5), + finished_at=timezone.now(), + task_id="some-task-id", + ) + + status_changed = job.check_status(force=True, save=True) + + self.assertFalse(status_changed) + self.assertEqual(job.status, JobState.SUCCESS) + + def test_check_status_skip_recent_check(self): + """Test that jobs checked recently are skipped unless forced.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Recent Check", + status=JobState.STARTED, + started_at=timezone.now() - timedelta(minutes=5), + last_checked_at=timezone.now() - timedelta(seconds=30), # 30 seconds ago + task_id="some-task-id", + ) + + # Without force, should skip + status_changed = job.check_status(force=False, save=True) + self.assertFalse(status_changed) + + # With force, should check + job.started_at = timezone.now() - timedelta(days=8) + status_changed = job.check_status(force=True, save=True) + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE) + + def test_check_status_updates_last_checked_at(self): + """Test that last_checked_at is updated even when status doesn't change.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Update Timestamp", + status=JobState.STARTED, + started_at=timezone.now() - timedelta(minutes=5), + task_id="some-task-id", + last_checked_at=None, + ) + + self.assertIsNone(job.last_checked_at) + job.check_status(force=True, save=True) + self.assertIsNotNone(job.last_checked_at) + + +class TestCheckUnfinishedJobsTask(TestCase): + """ + Test the periodic task that checks all unfinished jobs. + """ + + def setUp(self): + from django.core.cache import cache + + # Clear the lock before each test + cache.delete("check_unfinished_jobs_lock") + + self.project = Project.objects.create(name="Periodic Check Test Project") + self.pipeline = Pipeline.objects.create( + name="Test ML pipeline", + description="Test ML pipeline", + ) + self.pipeline.projects.add(self.project) + + def tearDown(self): + from django.core.cache import cache + + # Clear the lock after each test + cache.delete("check_unfinished_jobs_lock") + + def test_check_unfinished_jobs_task(self): + """Test the periodic task runs successfully.""" + from datetime import timedelta + + from django.utils import timezone + + from ami.jobs.tasks import check_unfinished_jobs + + # Create some test jobs in various states + Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Job 1 - Started", + status=JobState.STARTED, + started_at=timezone.now() - timedelta(days=8), + task_id="task-1", + ) + Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Job 2 - Success", + status=JobState.SUCCESS, + finished_at=timezone.now(), + ) # Should be skipped + Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Job 3 - Pending No Task", + status=JobState.PENDING, + scheduled_at=timezone.now() - timedelta(minutes=10), + task_id=None, + ) + + result = check_unfinished_jobs() + + self.assertEqual(result["status"], "success") + self.assertGreaterEqual(result["checked"], 2) # Should check jobs 1 and 3 + self.assertGreaterEqual(result["updated"], 2) # Should update both to FAILURE + + def test_check_unfinished_jobs_lock(self): + """Test that concurrent executions are prevented by locking.""" + from django.core.cache import cache + + from ami.jobs.tasks import check_unfinished_jobs + + # Set the lock manually + cache.set("check_unfinished_jobs_lock", "locked", 300) + + result = check_unfinished_jobs() + + self.assertEqual(result["status"], "skipped") + self.assertEqual(result["reason"], "already_running") + + def test_check_unfinished_jobs_no_jobs(self): + """Test the task handles empty job list gracefully.""" + from ami.jobs.tasks import check_unfinished_jobs + + result = check_unfinished_jobs() + + self.assertEqual(result["status"], "success") + self.assertEqual(result["checked"], 0) + self.assertEqual(result["updated"], 0) + + +class TestWorkerAvailability(TestCase): + """ + Test the worker availability checking functions. + """ + + def test_check_celery_workers_available(self): + """Test that worker availability check returns a tuple.""" + from ami.jobs.status_checker import check_celery_workers_available + + workers_available, worker_count = check_celery_workers_available() + + self.assertIsInstance(workers_available, bool) + self.assertIsInstance(worker_count, int) + + def test_check_celery_workers_available_cached(self): + """Test that cached version returns same result as uncached.""" + import time + + from ami.jobs.status_checker import check_celery_workers_available, check_celery_workers_available_cached + + # Clear cache + check_celery_workers_available_cached.cache_clear() + + timestamp = int(time.time() / 60) + cached_result = check_celery_workers_available_cached(timestamp) + direct_result = check_celery_workers_available() + + self.assertEqual(cached_result, direct_result) From 413eda09e515ff1caaf71ec5f5d2b8da77354281 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 31 Oct 2025 22:01:51 -0700 Subject: [PATCH 2/6] chore: rename status_checker.py to status.py and use 'incomplete' terminology MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename ami/jobs/status_checker.py → ami/jobs/status.py - Replace 'unfinished jobs' with 'incomplete jobs' throughout - Update migration file names and task names - Update test class names - Fix SourceImageCollectionAdmin missing search_fields - Add git commit guidelines to CLAUDE.md This makes terminology more accurate and sets up clean history for subsequent feature additions. Co-Authored-By: Claude --- ...te_check_incomplete_jobs_periodic_task.py} | 6 ++-- ami/jobs/models.py | 2 +- ami/jobs/{status_checker.py => status.py} | 0 ami/jobs/tasks.py | 26 ++++++++-------- ami/jobs/tests.py | 30 +++++++++---------- ami/main/admin.py | 1 + 6 files changed, 33 insertions(+), 32 deletions(-) rename ami/jobs/migrations/{0020_create_check_unfinished_jobs_periodic_task.py => 0020_create_check_incomplete_jobs_periodic_task.py} (88%) rename ami/jobs/{status_checker.py => status.py} (100%) diff --git a/ami/jobs/migrations/0020_create_check_unfinished_jobs_periodic_task.py b/ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py similarity index 88% rename from ami/jobs/migrations/0020_create_check_unfinished_jobs_periodic_task.py rename to ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py index 0b8a34602..2237ef099 100644 --- a/ami/jobs/migrations/0020_create_check_unfinished_jobs_periodic_task.py +++ b/ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py @@ -14,9 +14,9 @@ def create_periodic_task(apps, schema_editor): ) PeriodicTask.objects.get_or_create( - name="jobs.check_unfinished_jobs", + name="jobs.check_incomplete_jobs", defaults={ - "task": "ami.jobs.tasks.check_unfinished_jobs", + "task": "ami.jobs.tasks.check_incomplete_jobs", "interval": interval_schedule, "enabled": True, "description": "Check status of unfinished jobs and update if tasks disappeared", @@ -32,7 +32,7 @@ def delete_periodic_task(apps, schema_editor): try: from django_celery_beat.models import PeriodicTask - PeriodicTask.objects.filter(name="jobs.check_unfinished_jobs").delete() + PeriodicTask.objects.filter(name="jobs.check_incomplete_jobs").delete() except Exception as e: print(f"Warning: Could not delete periodic task: {e}") diff --git a/ami/jobs/models.py b/ami/jobs/models.py index b1c0dce5e..b58b129ac 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -973,7 +973,7 @@ def check_status(self, force: bool = False, save: bool = True) -> bool: Returns: bool: True if job status was changed, False otherwise """ - from ami.jobs.status_checker import check_job_status + from ami.jobs.status import check_job_status return check_job_status(self, force=force, save=save) diff --git a/ami/jobs/status_checker.py b/ami/jobs/status.py similarity index 100% rename from ami/jobs/status_checker.py rename to ami/jobs/status.py diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 2c674e0bb..5dd1841ce 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -71,9 +71,9 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): @celery_app.task(soft_time_limit=300, time_limit=360) -def check_unfinished_jobs(): +def check_incomplete_jobs(): """ - Periodic task to check status of all unfinished jobs. + Periodic task to check status of all incomplete jobs. This task identifies jobs stuck in non-final states and verifies their Celery tasks still exist. If tasks have disappeared or jobs have exceeded @@ -92,20 +92,20 @@ def check_unfinished_jobs(): MIN_CHECK_INTERVAL_MINUTES = 2 # Use cache-based locking to prevent duplicates - lock_id = "check_unfinished_jobs_lock" + lock_id = "check_incomplete_jobs_lock" if not cache.add(lock_id, "locked", LOCK_TIMEOUT_SECONDS): - task_logger.info("check_unfinished_jobs already running, skipping") + task_logger.info("check_incomplete_jobs already running, skipping") return {"status": "skipped", "reason": "already_running"} try: - task_logger.info("Starting check_unfinished_jobs task") + task_logger.info("Starting check_incomplete_jobs task") - # Get all unfinished jobs - unfinished_jobs = Job.objects.filter(status__in=JobState.running_states()).order_by("scheduled_at") + # Get all incomplete jobs + incomplete_jobs = Job.objects.filter(status__in=JobState.running_states()).order_by("scheduled_at") - total_jobs = unfinished_jobs.count() - task_logger.info(f"Found {total_jobs} unfinished jobs") + total_jobs = incomplete_jobs.count() + task_logger.info(f"Found {total_jobs} incomplete jobs") if total_jobs == 0: return {"status": "success", "checked": 0, "updated": 0} @@ -113,14 +113,14 @@ def check_unfinished_jobs(): # Limit to avoid overwhelming the system if total_jobs > MAX_JOBS_PER_RUN: task_logger.warning(f"Limiting to {MAX_JOBS_PER_RUN} jobs") - unfinished_jobs = unfinished_jobs[:MAX_JOBS_PER_RUN] + incomplete_jobs = incomplete_jobs[:MAX_JOBS_PER_RUN] # Filter to jobs not checked recently now = timezone.now() min_check_interval = datetime.timedelta(minutes=MIN_CHECK_INTERVAL_MINUTES) jobs_to_check = [] - for job in unfinished_jobs: + for job in incomplete_jobs: if job.last_checked_at is None: jobs_to_check.append(job) elif (now - job.last_checked_at) >= min_check_interval: @@ -145,12 +145,12 @@ def check_unfinished_jobs(): result = { "status": "success", - "total_unfinished": total_jobs, + "total_incomplete": total_jobs, "checked": checked_count, "updated": updated_count, "errors": error_count, } - task_logger.info(f"Completed check_unfinished_jobs: {result}") + task_logger.info(f"Completed check_incomplete_jobs: {result}") return result finally: diff --git a/ami/jobs/tests.py b/ami/jobs/tests.py index b0d6f9baa..f9915791f 100644 --- a/ami/jobs/tests.py +++ b/ami/jobs/tests.py @@ -374,7 +374,7 @@ def test_check_status_updates_last_checked_at(self): self.assertIsNotNone(job.last_checked_at) -class TestCheckUnfinishedJobsTask(TestCase): +class TestCheckIncompleteJobsTask(TestCase): """ Test the periodic task that checks all unfinished jobs. """ @@ -383,7 +383,7 @@ def setUp(self): from django.core.cache import cache # Clear the lock before each test - cache.delete("check_unfinished_jobs_lock") + cache.delete("check_incomplete_jobs_lock") self.project = Project.objects.create(name="Periodic Check Test Project") self.pipeline = Pipeline.objects.create( @@ -396,15 +396,15 @@ def tearDown(self): from django.core.cache import cache # Clear the lock after each test - cache.delete("check_unfinished_jobs_lock") + cache.delete("check_incomplete_jobs_lock") - def test_check_unfinished_jobs_task(self): + def test_check_incomplete_jobs_task(self): """Test the periodic task runs successfully.""" from datetime import timedelta from django.utils import timezone - from ami.jobs.tasks import check_unfinished_jobs + from ami.jobs.tasks import check_incomplete_jobs # Create some test jobs in various states Job.objects.create( @@ -431,31 +431,31 @@ def test_check_unfinished_jobs_task(self): task_id=None, ) - result = check_unfinished_jobs() + result = check_incomplete_jobs() self.assertEqual(result["status"], "success") self.assertGreaterEqual(result["checked"], 2) # Should check jobs 1 and 3 self.assertGreaterEqual(result["updated"], 2) # Should update both to FAILURE - def test_check_unfinished_jobs_lock(self): + def test_check_incomplete_jobs_lock(self): """Test that concurrent executions are prevented by locking.""" from django.core.cache import cache - from ami.jobs.tasks import check_unfinished_jobs + from ami.jobs.tasks import check_incomplete_jobs # Set the lock manually - cache.set("check_unfinished_jobs_lock", "locked", 300) + cache.set("check_incomplete_jobs_lock", "locked", 300) - result = check_unfinished_jobs() + result = check_incomplete_jobs() self.assertEqual(result["status"], "skipped") self.assertEqual(result["reason"], "already_running") - def test_check_unfinished_jobs_no_jobs(self): + def test_check_incomplete_jobs_no_jobs(self): """Test the task handles empty job list gracefully.""" - from ami.jobs.tasks import check_unfinished_jobs + from ami.jobs.tasks import check_incomplete_jobs - result = check_unfinished_jobs() + result = check_incomplete_jobs() self.assertEqual(result["status"], "success") self.assertEqual(result["checked"], 0) @@ -469,7 +469,7 @@ class TestWorkerAvailability(TestCase): def test_check_celery_workers_available(self): """Test that worker availability check returns a tuple.""" - from ami.jobs.status_checker import check_celery_workers_available + from ami.jobs.status import check_celery_workers_available workers_available, worker_count = check_celery_workers_available() @@ -480,7 +480,7 @@ def test_check_celery_workers_available_cached(self): """Test that cached version returns same result as uncached.""" import time - from ami.jobs.status_checker import check_celery_workers_available, check_celery_workers_available_cached + from ami.jobs.status import check_celery_workers_available, check_celery_workers_available_cached # Clear cache check_celery_workers_available_cached.cache_clear() diff --git a/ami/main/admin.py b/ami/main/admin.py index 215d8a3be..e5fc080a8 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -596,6 +596,7 @@ class SourceImageCollectionAdmin(admin.ModelAdmin[SourceImageCollection]): list_display = ("name", "image_count", "method", "kwargs", "created_at", "updated_at") list_filter = ("project",) + search_fields = ("name",) def get_queryset(self, request: HttpRequest) -> QuerySet[Any]: return super().get_queryset(request).annotate(image_count=models.Count("images")) From 824307e6731e743f8b80552fd10e1394d7c47ebe Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 31 Oct 2025 22:59:09 -0700 Subject: [PATCH 3/6] fix: keep displayed job progress status in sync with model status --- ami/jobs/models.py | 13 +++++++++---- ami/jobs/status.py | 6 +++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index b58b129ac..dc2f931fe 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -824,7 +824,7 @@ def send_task(): self.started_at = None self.finished_at = None self.scheduled_at = datetime.datetime.now() - self.status = AsyncResult(task_id).status + self.update_status(AsyncResult(task_id).status, save=False) self.update_progress(save=False) self.save() @@ -873,7 +873,7 @@ def retry(self, async_task=True): self.logger.info(f"Re-running job {self}") self.finished_at = None self.progress.reset() - self.status = JobState.RETRY + self.update_status(JobState.RETRY, save=False) self.save() if async_task: self.enqueue() @@ -884,7 +884,7 @@ def cancel(self): """ Terminate the celery task. """ - self.status = JobState.CANCELING + self.update_status(JobState.CANCELING, save=False) self.save() if self.task_id: task = run_job.AsyncResult(self.task_id) @@ -892,7 +892,7 @@ def cancel(self): task.revoke(terminate=True) self.save() else: - self.status = JobState.REVOKED + self.update_status(JobState.REVOKED, save=False) self.save() def update_status(self, status=None, save=True): @@ -920,6 +920,7 @@ def update_status(self, status=None, save=True): def update_progress(self, save=True): """ Update the total aggregate progress from the progress of each stage. + Also ensure the displayed progress.summary.status is in sync with job.status. """ if not len(self.progress.stages): # Need at least one stage to calculate progress @@ -938,6 +939,10 @@ def update_progress(self, save=True): total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages) self.progress.summary.progress = total_progress + try: + self.progress.summary.status = JobState(self.status) + except ValueError: + self.progress.summary.status = JobState.UNKNOWN if save: self.save(update_progress=False) diff --git a/ami/jobs/status.py b/ami/jobs/status.py index a43f7e2a9..a11d7f940 100644 --- a/ami/jobs/status.py +++ b/ami/jobs/status.py @@ -107,7 +107,7 @@ def check_job_status(job, force: bool = False, save: bool = True) -> bool: f"Job exceeded maximum runtime of {job.MAX_JOB_RUNTIME_SECONDS}s " f"(running for {runtime:.0f}s). Marking as FAILURE." ) - job.status = JobState.FAILURE + job.update_status(JobState.FAILURE, save=False) status_changed = True if save: job.save(update_progress=False) @@ -121,7 +121,7 @@ def check_job_status(job, force: bool = False, save: bool = True) -> bool: job.logger.error( f"Job scheduled {time_waiting:.0f}s ago but never got a task_id. " f"Marking as FAILURE." ) - job.status = JobState.FAILURE + job.update_status(JobState.FAILURE, save=False) status_changed = True if save: job.save(update_progress=False) @@ -143,7 +143,7 @@ def check_job_status(job, force: bool = False, save: bool = True) -> bool: f"Task {job.task_id} disappeared from Celery " f"(started {time_since_start:.0f}s ago). Marking as FAILURE." ) - job.status = JobState.FAILURE + job.update_status(JobState.FAILURE, save=False) status_changed = True # Scenario 5: Resurrection From b3dd017b6e8c9fb7fc15ceb0ce378129609e734c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 31 Oct 2025 23:39:25 -0700 Subject: [PATCH 4/6] fix: ensure flake8 in pre-commit uses the same config --- .pre-commit-config.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6d0c29010..66559fa52 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -43,6 +43,11 @@ repos: rev: 6.0.0 hooks: - id: flake8 + # Ensure flake8 reads the project's setup.cfg (pre-commit runs hooks + # from the hook repo directory which may contain its own .flake8). + # flake8 doesn't yet support pyproject.toml for config, so point to + # setup.cfg which contains our project flake8 settings. + args: ["--config=setup.cfg"] - repo: https://github.com/Riverside-Healthcare/djLint rev: v1.31.1 From 296a7658917d1f23795021b2723f33d62e30ca02 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 31 Oct 2025 23:40:34 -0700 Subject: [PATCH 5/6] feat: tests for job status changes --- ami/jobs/tasks.py | 2 +- ami/jobs/tests.py | 76 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 5dd1841ce..8c53e2013 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -71,7 +71,7 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): @celery_app.task(soft_time_limit=300, time_limit=360) -def check_incomplete_jobs(): +def check_incomplete_jobs() -> dict[str, int | str]: """ Periodic task to check status of all incomplete jobs. diff --git a/ami/jobs/tests.py b/ami/jobs/tests.py index f9915791f..42040f7a1 100644 --- a/ami/jobs/tests.py +++ b/ami/jobs/tests.py @@ -373,6 +373,78 @@ def test_check_status_updates_last_checked_at(self): job.check_status(force=True, save=True) self.assertIsNotNone(job.last_checked_at) + def test_enqueue_syncs_status(self): + """Test that enqueue() keeps status in sync.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Enqueue Sync", + ) + + # Enqueue the job + job.enqueue() + + # Status and progress should be in sync + self.assertEqual(job.status, job.progress.summary.status) + + def test_retry_syncs_status(self): + """Test that retry() keeps status in sync.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + pipeline=self.pipeline, + name="Test Job - Retry Sync", + status=JobState.FAILURE, + ) + + job.retry(async_task=True) + + # Refresh to get latest status after retry() method completes + job.refresh_from_db() + + # Status should have been updated and should be in sync + # After retry() with async_task=True, it calls enqueue() which sets status to PENDING + self.assertEqual(job.status, job.progress.summary.status) + + def test_cancel_syncs_status(self): + """Test that cancel() keeps status in sync.""" + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Cancel Sync", + status=JobState.STARTED, + ) + + # Cancel the job (without task_id, so it goes to REVOKED) + job.cancel() + + # Status should be REVOKED and in sync + self.assertEqual(job.status, JobState.REVOKED) + self.assertEqual(job.progress.summary.status, JobState.REVOKED) + + def test_status_checker_syncs_status(self): + """Test that status checker keeps status in sync when marking as FAILURE.""" + from datetime import timedelta + + from django.utils import timezone + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Test Job - Checker Sync", + status=JobState.STARTED, + started_at=timezone.now() - timedelta(days=8), # Exceeds max runtime + task_id="some-task", + ) + + # Check status - should mark as FAILURE + status_changed = job.check_status(force=True, save=True) + + # Both should now be FAILURE and synced + self.assertTrue(status_changed) + self.assertEqual(job.status, JobState.FAILURE) + self.assertEqual(job.progress.summary.status, JobState.FAILURE) + class TestCheckIncompleteJobsTask(TestCase): """ @@ -434,8 +506,8 @@ def test_check_incomplete_jobs_task(self): result = check_incomplete_jobs() self.assertEqual(result["status"], "success") - self.assertGreaterEqual(result["checked"], 2) # Should check jobs 1 and 3 - self.assertGreaterEqual(result["updated"], 2) # Should update both to FAILURE + self.assertGreaterEqual(int(result["checked"]), 2) # Should check jobs 1 and 3 + self.assertGreaterEqual(int(result["updated"]), 2) # Should update both to FAILURE def test_check_incomplete_jobs_lock(self): """Test that concurrent executions are prevented by locking.""" From 3a090fd9a3a7923bdab28ee2242b911ed8703170 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Sat, 1 Nov 2025 00:22:39 -0700 Subject: [PATCH 6/6] feat: cleanup status checks --- ami/jobs/status.py | 434 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 338 insertions(+), 96 deletions(-) diff --git a/ami/jobs/status.py b/ami/jobs/status.py index a11d7f940..d34411f00 100644 --- a/ami/jobs/status.py +++ b/ami/jobs/status.py @@ -4,6 +4,7 @@ This module provides functions to check if Celery tasks still exist and update job statuses accordingly when tasks disappear, get stuck, or exceed timeouts. """ + import datetime import functools import logging @@ -54,20 +55,308 @@ def check_celery_workers_available_cached(timestamp: int) -> tuple[bool, int]: return check_celery_workers_available() +def _mark_as_failed(job, error_message: str, now: datetime.datetime) -> bool: + """ + Mark the job as failed. + + Args: + job: The Job instance + error_message: The error message to log + now: The current datetime to use for finished_at + + Returns: + True if status changed, False if already failed + """ + from ami.jobs.models import JobState + + if job.status == JobState.FAILURE: + return False # No change + + job.logger.error(error_message) + job.update_status(JobState.FAILURE, save=False) + job.finished_at = now + return True + + +def _check_if_resurrected(job, now: datetime.datetime) -> bool: + """ + Check if a failed job has been resurrected (Celery task is now running/completed). + + Args: + job: The Job instance + now: Current datetime + + Returns: + True if job was resurrected (status changed), False otherwise + """ + from ami.jobs.models import JobState + + if job.status != JobState.FAILURE or not job.task_id: + return False + + try: + task = AsyncResult(job.task_id) + celery_status = task.status + + # If the task is now running or succeeded, resurrect the job + if celery_status in [JobState.STARTED, JobState.SUCCESS]: + job.logger.warning( + f"Job was marked FAILURE but task is {celery_status}. Resurrecting job to match task state." + ) + job.update_status(celery_status, save=False) + job.finished_at = None if celery_status == JobState.STARTED else job.finished_at + return True + + except Exception as e: + job.logger.debug(f"Could not check resurrection status: {e}") + + return False + + +def _check_missing_task_id(job, now: datetime.datetime) -> bool: + """ + Check if job was scheduled but never got a task_id. + + Args: + job: The Job instance + now: Current datetime + + Returns: + True if status changed, False otherwise + """ + from ami.jobs.models import JobState + + if job.task_id or not job.scheduled_at or job.status != JobState.PENDING: + return False + + time_since_scheduled = (now - job.scheduled_at).total_seconds() + if time_since_scheduled > job.NO_TASK_ID_TIMEOUT_SECONDS: + return _mark_as_failed( + job, + f"Job scheduled {time_since_scheduled:.0f}s ago but never got a task_id. Marking as FAILURE.", + now, + ) + return False + + +def _check_disappeared_task(job, celery_status: str | None, now: datetime.datetime) -> bool: + """ + Check if task has disappeared from Celery backend. + + Args: + job: The Job instance + celery_status: The task status (may be None) + now: Current datetime + + Returns: + True if task disappeared and job was marked failed, False otherwise + """ + from ami.jobs.models import JobState + + # Detect disappeared task: None status OR PENDING when job isn't actually pending + is_disappeared = celery_status is None or ( + celery_status == "PENDING" and job.status not in [JobState.CREATED, JobState.PENDING] + ) + + if not is_disappeared: + return False + + job.logger.warning(f"Task {job.task_id} not found in Celery backend (current job status: {job.status})") + + # If task disappeared shortly after starting, note it might be a worker crash + if job.status in JobState.running_states() and job.started_at: + time_since_start = (now - job.started_at).total_seconds() + if time_since_start < job.DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS: + job.logger.info( + f"Task {job.task_id} disappeared shortly after starting " + f"(started {time_since_start:.0f}s ago). This may indicate a worker crash." + ) + + # Mark as failed + return _mark_as_failed( + job, + f"Task {job.task_id} disappeared from Celery. Marking as FAILURE.", + now, + ) + + +def _check_status_mismatch(job, celery_status: str, now: datetime.datetime) -> bool: + """ + Check if job status doesn't match Celery task status and reconcile. + + Args: + job: The Job instance + celery_status: The status reported by Celery + now: Current datetime + + Returns: + True if status changed, False otherwise + """ + from ami.jobs.models import JobState + + if celery_status == job.status: + return False + + job.logger.warning( + f"Job status '{job.status}' doesn't match Celery task status '{celery_status}'. " f"Updating to match Celery." + ) + + old_status = job.status + job.update_status(celery_status, save=False) + + # If Celery says it's in a final state but we thought it was running + if celery_status in JobState.final_states() and old_status in JobState.running_states(): + job.finished_at = now + if celery_status in JobState.failed_states(): + job.logger.error(f"Task failed in Celery") + + return True + + +def _check_if_stale(job, task: AsyncResult | None, now: datetime.datetime) -> bool: + """ + Check if job has been running for too long and should be marked as stale. + + Args: + job: The Job instance + task: The Celery AsyncResult (may be None if task query failed) + now: Current datetime + + Returns: + True if status changed, False otherwise + """ + from ami.jobs.models import JobState + + if job.status not in JobState.running_states() or not job.started_at: + return False + + time_since_start = (now - job.started_at).total_seconds() + if time_since_start <= job.MAX_JOB_RUNTIME_SECONDS: + return False + + # Job is stale - mark as failed + max_runtime_hours = job.MAX_JOB_RUNTIME_SECONDS / 3600 + _mark_as_failed( + job, + f"Job exceeded maximum runtime of {max_runtime_hours:.0f} hours " + f"(running for {time_since_start:.0f}s). Marking as FAILURE.", + now, + ) + + # Try to revoke the task if available + if task: + try: + task.revoke(terminate=True) + job.logger.info(f"Revoked stale task {job.task_id}") + except Exception as e: + job.logger.error(f"Failed to revoke stale task {job.task_id}: {e}") + + return True + + +def _check_stuck_pending(job, celery_status: str, now: datetime.datetime) -> bool: + """ + Check if task is stuck in PENDING state for too long. + + Args: + job: The Job instance + celery_status: The status reported by Celery + now: Current datetime + + Returns: + True if status changed, False otherwise + """ + from ami.jobs.models import JobState + + if celery_status != JobState.PENDING or not job.scheduled_at: + return False + + time_since_scheduled = (now - job.scheduled_at).total_seconds() + + # Check if workers are available (using cached check to avoid excessive queries) + current_minute = int(time.time() / 60) + workers_available, worker_count = check_celery_workers_available_cached(current_minute) + + # Determine timeout based on worker availability + timeout = job.STUCK_PENDING_TIMEOUT_SECONDS if workers_available else job.STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS + + # Log periodic waiting messages (approximately every 5 minutes) + if time_since_scheduled > job.PENDING_LOG_INTERVAL_SECONDS: + # Calculate how many intervals have passed + intervals_passed = int(time_since_scheduled / job.PENDING_LOG_INTERVAL_SECONDS) + time_since_last_interval = time_since_scheduled - (intervals_passed * job.PENDING_LOG_INTERVAL_SECONDS) + + # Log if we're within the first 60 seconds of a new interval + if time_since_last_interval < 60: + if workers_available: + job.logger.warning( + f"Job has been waiting for {time_since_scheduled:.0f}s " + f"with {worker_count} worker(s) available. Task may be queued behind other jobs." + ) + else: + job.logger.error( + f"Job has been waiting for {time_since_scheduled:.0f}s. " + f"NO WORKERS RUNNING - task cannot be picked up until workers start." + ) + + # Check if timeout exceeded + if time_since_scheduled > timeout: + if workers_available: + error_message = ( + f"Job has been pending for {time_since_scheduled:.0f}s " f"with workers available. Marking as FAILURE." + ) + else: + error_message = ( + f"Job has been pending for {time_since_scheduled:.0f}s " + f"with no workers detected. Marking as FAILURE." + ) + return _mark_as_failed(job, error_message, now) + + return False + + +def _should_skip_check(job, force: bool, now: datetime.datetime) -> bool: + """Check if we should skip the status check entirely.""" + if force or not job.last_checked_at: + return False + + time_since_check = now - job.last_checked_at + return time_since_check < datetime.timedelta(minutes=2) + + +def _save_job(job, status_changed: bool, save: bool): + """Save job with appropriate fields based on whether status changed.""" + if not save: + return + + fields = ["last_checked_at", "status", "progress", "finished_at"] if status_changed else ["last_checked_at"] + job.save(update_fields=fields, update_progress=False) + + +def _get_celery_task_status(job) -> tuple[AsyncResult | None, str | None]: + """Query Celery for task and its status.""" + try: + task = AsyncResult(job.task_id) + return task, task.status + except Exception as e: + job.logger.warning(f"Could not query task {job.task_id}: {e}") + return None, None + + def check_job_status(job, force: bool = False, save: bool = True) -> bool: """ Check if the job's Celery task still exists and update status accordingly. - This function handles multiple scenarios: - 1. Missing task_id - Job scheduled but never got a task ID - 2. Disappeared task - Task existed but is now gone from Celery - 3. Stuck pending - Job waiting too long with no workers - 4. Max runtime exceeded - Job running longer than allowed - 5. Resurrection - Job marked failed but task is actually running + This function handles multiple scenarios in order: + 1. Resurrection - Job marked failed but task is actually running + 2. Missing task_id - Job scheduled but never got a task ID + 3. Stale job - Job running longer than allowed + 4. Disappeared task - Task existed but is now gone from Celery + 5. Stuck pending - Job waiting too long with/without workers Args: job: The Job instance to check - force: Skip the recent check time limit + force: Skip the recent check time limit and check final states save: Save the job if status changes Returns: @@ -76,107 +365,60 @@ def check_job_status(job, force: bool = False, save: bool = True) -> bool: from ami.jobs.models import JobState now = timezone.now() - status_changed = False # Skip if checked recently (unless forced) - if not force and job.last_checked_at: - time_since_check = now - job.last_checked_at - if time_since_check < datetime.timedelta(minutes=2): - # Still update last_checked_at even though we're skipping - if save: - job.last_checked_at = now - job.save(update_fields=["last_checked_at"], update_progress=False) - return False - - # Update last_checked_at for this check - if save: + if _should_skip_check(job, force, now): job.last_checked_at = now - - # Skip if job is already in a final state - if job.status in JobState.final_states(): - if save: - job.save(update_fields=["last_checked_at"], update_progress=False) + _save_job(job, status_changed=False, save=save) return False - # Scenario 4: Max Runtime Exceeded (check this first - doesn't require Celery query) - if job.status == JobState.STARTED and job.started_at: - runtime = (now - job.started_at).total_seconds() + job.last_checked_at = now - if runtime > job.MAX_JOB_RUNTIME_SECONDS: - job.logger.error( - f"Job exceeded maximum runtime of {job.MAX_JOB_RUNTIME_SECONDS}s " - f"(running for {runtime:.0f}s). Marking as FAILURE." - ) - job.update_status(JobState.FAILURE, save=False) - status_changed = True - if save: - job.save(update_progress=False) - return status_changed + # Check 0: Resurrection (failed jobs that came back to life) + if not force and _check_if_resurrected(job, now): + _save_job(job, status_changed=True, save=save) + return True + + # Skip final states unless forced + if not force and job.status in JobState.final_states(): + _save_job(job, status_changed=False, save=save) + return False + + # Check 1: Missing Task ID + if _check_missing_task_id(job, now): + _save_job(job, status_changed=True, save=save) + return True - # Scenario 1: Missing Task ID if not job.task_id: - if job.status == JobState.PENDING and job.scheduled_at: - time_waiting = (now - job.scheduled_at).total_seconds() - if time_waiting > job.NO_TASK_ID_TIMEOUT_SECONDS: - job.logger.error( - f"Job scheduled {time_waiting:.0f}s ago but never got a task_id. " f"Marking as FAILURE." - ) - job.update_status(JobState.FAILURE, save=False) - status_changed = True - if save: - job.save(update_progress=False) - return status_changed - - # Scenario 2-5: Has task_id - if job.task_id: - try: - task = AsyncResult(job.task_id) - celery_status = task.status - - # Scenario 2: Disappeared Task - # If Celery says PENDING but job was STARTED, task disappeared - if celery_status == "PENDING" and job.status == JobState.STARTED: - if job.started_at: - time_since_start = (now - job.started_at).total_seconds() - if time_since_start > job.DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS: - job.logger.error( - f"Task {job.task_id} disappeared from Celery " - f"(started {time_since_start:.0f}s ago). Marking as FAILURE." - ) - job.update_status(JobState.FAILURE, save=False) - status_changed = True - - # Scenario 5: Resurrection - # Job marked as FAILURE but task is actually running or succeeded - elif job.status == JobState.FAILURE and celery_status in ["STARTED", "SUCCESS"]: - job.logger.warning( - f"Job was marked FAILURE but task is {celery_status}. " f"Resurrecting job to match task state." - ) - job.update_status(celery_status, save=False) - status_changed = True + _save_job(job, status_changed=False, save=save) + return False - except Exception as e: - job.logger.warning(f"Could not check task {job.task_id} status: {e}") + # Get Celery task and status + task, celery_status = _get_celery_task_status(job) - # Scenario 3: Stuck Pending (No Workers) - if job.status == JobState.PENDING and job.scheduled_at: - time_pending = (now - job.scheduled_at).total_seconds() + # Check 2: Stale Job + if _check_if_stale(job, task, now): + _save_job(job, status_changed=True, save=save) + return True - if time_pending > job.STUCK_PENDING_TIMEOUT_SECONDS: - workers_available, worker_count = check_celery_workers_available_cached( - int(time.time() / 60) # Cache for 1 minute - ) + # Check 3: Disappeared Task (only for non-final states) + if job.status not in JobState.final_states() and _check_disappeared_task(job, celery_status, now): + _save_job(job, status_changed=True, save=save) + return True - if not workers_available: - job.logger.warning( - f"Job stuck in PENDING for {time_pending:.0f}s " - f"but no workers available (worker_count={worker_count}). " - f"Not marking as failed yet - may just be queued." - ) - # Don't mark as failed - might just be queued behind other jobs + # No celery status available - can't check further + if not celery_status: + _save_job(job, status_changed=False, save=save) + return False + + # Skip PENDING status for final states (task just doesn't exist anymore) + if job.status in JobState.final_states() and celery_status == JobState.PENDING: + _save_job(job, status_changed=False, save=save) + return False - # Save if status changed or if we need to update last_checked_at - if save: - job.save(update_progress=False) + # Check 4: Stuck pending + if job.status not in JobState.final_states(): + status_changed = _check_stuck_pending(job, celery_status, now) + _save_job(job, status_changed, save=save) return status_changed