Skip to content

Conversation

@mihow
Copy link
Collaborator

@mihow mihow commented Oct 31, 2025

Simplified version of what's in #981 & #910

Primarily addressing jobs where the celery background job is disconnected the job is left running without progress. But also fixes other issues along the way.

  • Check incomplete jobs with a periodic task, mark them as failed when appropriate
  • Update the displayed job status more consistently
  • Update jobs admin for more visibility and troubleshooting

Summary by CodeRabbit

  • New Features

    • Added retry and cancel actions for jobs in the admin interface.
    • Introduced automatic periodic job status monitoring to detect and reconcile stuck, disappeared, or stale jobs.
    • Enhanced admin interface with improved filtering by job type, project, and pipeline; added search for source image collections.
  • Bug Fixes

    • Jobs are now automatically checked and updated when task state mismatches occur.

@netlify
Copy link

netlify bot commented Oct 31, 2025

Deploy Preview for antenna-preview ready!

Name Link
🔨 Latest commit 100bb05
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/6916cf06db37fd0008e9d30a
😎 Deploy Preview https://deploy-preview-1025--antenna-preview.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.
Lighthouse
Lighthouse
1 paths audited
Performance: 32 (🔴 down 6 from production)
Accessibility: 80 (no change from production)
Best Practices: 100 (no change from production)
SEO: 92 (no change from production)
PWA: 80 (no change from production)
View the detailed breakdown and full score reports

To edit notification comments on pull requests, go to your Netlify project configuration.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 31, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

A new job status monitoring system is introduced, including periodic task checking, admin retry/cancel actions, status reconciliation logic with timeout thresholds, and enhanced admin filtering. A new status.py module handles Celery task state reconciliation. Migrations add a last_checked_at field and periodic task setup. Admin actions and model methods support the centralized status update workflow.

Changes

Cohort / File(s) Summary
Admin Interface Updates
ami/jobs/admin.py, ami/main/admin.py
Added retry_jobs and cancel_jobs admin actions; updated list_display to show job_type_key and project; expanded readonly_fields and list_filter; added autocomplete_fields for source_image_collection. Added search_fields to SourceImageCollectionAdmin.
Model & Schema Definitions
ami/jobs/models.py, ami/jobs/migrations/0019_job_last_checked_at.py, ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py
Added last_checked_at DateTimeField to Job; introduced timeout configuration constants (NO_TASK_ID_TIMEOUT_SECONDS, MAX_JOB_RUNTIME_SECONDS, etc.); added check_status() method; refactored status updates to use centralized update_status() method. Migrations create the field and set up periodic task.
Job Status Checking
ami/jobs/status.py
New module providing check_job_status() and check_celery_workers_available() functions; implements comprehensive status reconciliation including task resurrection, missing task_id detection, stale job handling, disappeared task detection, and stuck pending monitoring with worker availability awareness.
Periodic Task Execution
ami/jobs/tasks.py
Added check_incomplete_jobs Celery task with cache-based locking; retrieves incomplete jobs, respects MIN_CHECK_INTERVAL_MINUTES window, calls job.check_status() for each job, and returns summary of checked/updated/error counts.
Test Coverage
ami/jobs/tests.py
Added TestJobStatusChecking, TestCheckIncompleteJobsTask, and TestWorkerAvailability test classes covering status checking scenarios, task locking, worker availability detection, and state synchronization.

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler as Celery Beat Scheduler
    participant Task as check_incomplete_jobs Task
    participant Cache
    participant Job
    participant StatusModule as status.check_job_status()
    participant Celery as Celery AsyncResult

    Scheduler->>Task: Execute periodic task (every 3 min)
    Task->>Cache: Attempt to acquire lock
    alt Lock acquired
        Task->>Job: Query incomplete jobs (capped at 100)
        Note over Job: Filter by last_checked_at > now - MIN_CHECK_INTERVAL
        
        loop For each job
            Task->>StatusModule: check_status(force=False, save=True)
            
            alt Worker available
                StatusModule->>Celery: Get task status via AsyncResult
                Celery-->>StatusModule: Task state (PENDING/SUCCESS/FAILURE/etc)
                StatusModule->>Job: Reconcile & update status if changed
                StatusModule->>Job: Update last_checked_at
                Note over Job: Log decision (resurrection, timeout, etc)
            else No worker available
                Note over StatusModule: Monitor stuck PENDING with extended timeout
            end
            
            StatusModule-->>Task: Return changed (bool)
            Task->>Task: Increment counters (checked/updated/errors)
        end
        
        Task-->>Scheduler: Return summary {status, totals, counts}
    else Lock held
        Task-->>Scheduler: Skip execution, return early
    end
    Task->>Cache: Release lock (finally)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • ami/jobs/status.py: New module with dense logic for status reconciliation; requires careful review of edge cases (resurrection, disappeared tasks, stuck pending detection), timeout thresholds, and worker availability checks
  • ami/jobs/tasks.py: Verify cache-based locking mechanism and MIN_CHECK_INTERVAL_MINUTES filtering logic
  • Admin actions: Ensure retry_jobs and cancel_jobs handle queryset mutations correctly
  • Models & migrations: Validate that update_status() centralization is applied consistently across enqueue/retry/cancel paths and that last_checked_at synchronization is correct

Poem

🐰 Hoppy times are here, jobs now check their fate,
Periodic hops keep status up-to-date,
Retry, cancel, resume—admins have their way,
Status synced and workers watched all day! 🎉

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 79.59% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add periodic status check for incomplete jobs' accurately describes the main feature introduced in the PR—a periodic task that monitors and checks incomplete job statuses.
Description check ✅ Passed The PR description includes a summary of changes, context from related PRs, and the primary objectives; all key sections of the template are covered with substantive information.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/job-status-check-simple

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

mihow and others added 5 commits October 31, 2025 22:01
…minology

- Rename ami/jobs/status_checker.py → ami/jobs/status.py
- Replace 'unfinished jobs' with 'incomplete jobs' throughout
- Update migration file names and task names
- Update test class names
- Fix SourceImageCollectionAdmin missing search_fields
- Add git commit guidelines to CLAUDE.md

This makes terminology more accurate and sets up clean history for
subsequent feature additions.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow marked this pull request as ready for review November 1, 2025 07:31
Copilot AI review requested due to automatic review settings November 1, 2025 07:31
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request introduces a comprehensive job status monitoring system to track and manage Celery jobs that may become stuck or lost. The implementation adds periodic checking for incomplete jobs and handles various failure scenarios like missing task IDs, disappeared tasks, and exceeded runtimes.

Key Changes

  • Adds a new check_status() method to the Job model with supporting constants for timeout configurations
  • Implements a periodic task check_incomplete_jobs that runs every 3 minutes to monitor job health
  • Creates a new status.py module with detailed status checking logic for various failure scenarios
  • Enhances the Job admin interface with better filtering, search capabilities, and bulk actions
  • Ensures job status stays synchronized with progress tracking across status update operations

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
ami/jobs/models.py Adds status checking constants, last_checked_at field, check_status() method, and updates status management to keep job and progress in sync
ami/jobs/status.py New module implementing comprehensive status checking logic with helpers for detecting stuck/disappeared jobs
ami/jobs/tasks.py Adds periodic task check_incomplete_jobs with locking to check incomplete jobs every few minutes
ami/jobs/tests.py Comprehensive test coverage for status checking scenarios and periodic task behavior
ami/jobs/admin.py Improves admin interface with retry/cancel actions, better list display ordering, filters, and autocomplete
ami/jobs/migrations/0019_job_last_checked_at.py Database migration to add the last_checked_at timestamp field
ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py Creates the periodic task for automated job monitoring
ami/main/admin.py Adds search functionality for SourceImageCollection name field
.pre-commit-config.yaml Ensures flake8 uses project configuration by explicitly pointing to setup.cfg

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes
MAX_JOB_RUNTIME_SECONDS = 7 * 24 * 60 * 60 # 7 days
STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes

Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing required constants STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS and PENDING_LOG_INTERVAL_SECONDS which are referenced in ami/jobs/status.py at lines 281, 284, 286, and 287. These constants should be defined in the Job model alongside other timeout constants.

Suggested change
STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 600 # 10 minutes
PENDING_LOG_INTERVAL_SECONDS = 60 # 1 minute

Copilot uses AI. Check for mistakes.
self.message_user(request, f"Cancelled {queryset.count()} job(s).")

actions = [enqueue_jobs]
actions = [enqueue_jobs, retry_jobs]
Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel_jobs action is defined at line 39 but not included in the actions list. Either remove the unused action method or add it to the actions list: actions = [enqueue_jobs, retry_jobs, cancel_jobs]

Suggested change
actions = [enqueue_jobs, retry_jobs]
actions = [enqueue_jobs, retry_jobs, cancel_jobs]

Copilot uses AI. Check for mistakes.
_save_job(job, status_changed=False, save=save)
return False

# Check 4: Stuck pending
Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable status_changed is only assigned within the conditional block but is used unconditionally at line 423. If the condition is false, status_changed will be undefined, causing a NameError. Initialize status_changed = False before the conditional block.

Suggested change
# Check 4: Stuck pending
# Check 4: Stuck pending
status_changed = False

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
ami/jobs/status.py (1)

216-255: Use logging.exception for better error diagnostics.

Line 252 should use job.logger.exception() instead of job.logger.error() to automatically include the exception traceback in the log output.

Apply this diff:

-            job.logger.error(f"Failed to revoke stale task {job.task_id}: {e}")
+            job.logger.exception(f"Failed to revoke stale task {job.task_id}: {e}")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a063a4d and 3a090fd.

📒 Files selected for processing (9)
  • .pre-commit-config.yaml (1 hunks)
  • ami/jobs/admin.py (4 hunks)
  • ami/jobs/migrations/0019_job_last_checked_at.py (1 hunks)
  • ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py (1 hunks)
  • ami/jobs/models.py (7 hunks)
  • ami/jobs/status.py (1 hunks)
  • ami/jobs/tasks.py (2 hunks)
  • ami/jobs/tests.py (1 hunks)
  • ami/main/admin.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ami/jobs/tasks.py (1)
ami/jobs/models.py (6)
  • logger (1022-1031)
  • Job (719-1037)
  • JobState (27-63)
  • running_states (54-55)
  • check_status (968-983)
  • save (955-966)
ami/jobs/tests.py (3)
ami/jobs/models.py (8)
  • Job (719-1037)
  • MLJob (316-513)
  • JobState (27-63)
  • check_status (968-983)
  • save (955-966)
  • enqueue (812-829)
  • retry (869-881)
  • cancel (883-896)
ami/jobs/tasks.py (1)
  • check_incomplete_jobs (74-158)
ami/jobs/status.py (2)
  • check_celery_workers_available (21-39)
  • check_celery_workers_available_cached (43-55)
ami/jobs/status.py (1)
ami/jobs/models.py (8)
  • logger (1022-1031)
  • JobState (27-63)
  • update_status (898-918)
  • save (955-966)
  • running_states (54-55)
  • final_states (58-59)
  • failed_states (62-63)
  • update_progress (920-948)
ami/jobs/admin.py (2)
ami/jobs/models.py (3)
  • Job (719-1037)
  • retry (869-881)
  • cancel (883-896)
ami/jobs/views.py (2)
  • retry (98-109)
  • cancel (112-119)
ami/jobs/models.py (2)
ami/jobs/tasks.py (1)
  • run_job (18-35)
ami/jobs/status.py (1)
  • check_job_status (346-424)
🪛 Ruff (0.14.2)
ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py

6-6: Unused function argument: apps

(ARG001)


6-6: Unused function argument: schema_editor

(ARG001)


25-25: Do not catch blind exception: Exception

(BLE001)


30-30: Unused function argument: apps

(ARG001)


30-30: Unused function argument: schema_editor

(ARG001)


36-36: Do not catch blind exception: Exception

(BLE001)


41-43: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


45-47: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/jobs/status.py

36-36: Consider moving this statement to an else block

(TRY300)


37-37: Do not catch blind exception: Exception

(BLE001)


43-43: Unused function argument: timestamp

(ARG001)


81-81: Unused function argument: now

(ARG001)


110-110: Do not catch blind exception: Exception

(BLE001)


211-211: f-string without any placeholders

Remove extraneous f prefix

(F541)


251-251: Do not catch blind exception: Exception

(BLE001)


252-252: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


340-340: Consider moving this statement to an else block

(TRY300)


341-341: Do not catch blind exception: Exception

(BLE001)

ami/jobs/admin.py

44-44: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

ami/jobs/migrations/0019_job_last_checked_at.py

7-9: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


11-19: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

🔇 Additional comments (10)
ami/jobs/status.py (10)

1-19: LGTM!

Module setup, imports, and documentation are well-structured.


42-56: LGTM! The timestamp parameter is used correctly.

The static analysis warning about the unused timestamp parameter is a false positive. The parameter serves as the cache key for lru_cache, enabling minute-based cache expiration—a smart pattern for this use case.


58-79: LGTM!

The helper function correctly marks jobs as failed with proper idempotency and timestamp handling.


81-114: Review the finished_at handling for SUCCESS resurrection.

Line 107 preserves the existing finished_at when a resurrected job is in SUCCESS state. However, if finished_at was never set (which is possible for a resurrected job), it should probably be set to now for SUCCESS tasks.

Consider:

-            job.finished_at = None if celery_status == JobState.STARTED else job.finished_at
+            if celery_status == JobState.STARTED:
+                job.finished_at = None
+            elif celery_status == JobState.SUCCESS and not job.finished_at:
+                job.finished_at = now

116-140: LGTM!

The missing task ID check correctly identifies jobs that were scheduled but never received a task ID within the timeout.


142-181: LGTM!

The disappeared task detection logic is thorough and includes helpful diagnostics for potential worker crashes.


257-316: LGTM!

The stuck pending detection logic is well-designed with adaptive timeouts based on worker availability and thoughtful periodic logging to aid troubleshooting.


318-325: LGTM!

The skip check logic appropriately throttles status checks to avoid excessive queries.


327-334: LGTM!

The save strategy efficiently updates only the necessary fields based on whether status changed.


336-344: LGTM!

The Celery task status query properly handles errors and returns a consistent tuple format.

Comment on lines +44 to 45
actions = [enqueue_jobs, retry_jobs]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add cancel_jobs to the actions list

cancel_jobs is defined but never registered in actions, so the new admin action will not appear in the UI. Please include it to make the cancellation action usable.

-    actions = [enqueue_jobs, retry_jobs]
+    actions = [enqueue_jobs, retry_jobs, cancel_jobs]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
actions = [enqueue_jobs, retry_jobs]
actions = [enqueue_jobs, retry_jobs, cancel_jobs]
🧰 Tools
🪛 Ruff (0.14.2)

44-44: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

🤖 Prompt for AI Agents
In ami/jobs/admin.py around lines 44-45, the admin action list currently
contains only [enqueue_jobs, retry_jobs]; add cancel_jobs to that list so the
cancel_jobs action is registered and appears in the admin UI (e.g., change
actions = [enqueue_jobs, retry_jobs] to actions = [enqueue_jobs, retry_jobs,
cancel_jobs]). Ensure no trailing commas or syntax errors are introduced.

Comment on lines +21 to +40
def check_celery_workers_available() -> tuple[bool, int]:
"""
Check if any Celery workers are currently running.
Returns:
tuple: (workers_available: bool, worker_count: int)
"""
try:
inspect = celery_app.control.inspect()
active_workers = inspect.active()

if active_workers is None:
return False, 0

worker_count = len(active_workers)
return worker_count > 0, worker_count
except Exception as e:
logger.warning(f"Failed to check for Celery workers: {e}")
return True, 0 # Fail open - assume workers might be available

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider a more consistent fail-open return value.

Line 39 returns (True, 0) on exception, which is semantically inconsistent—it indicates workers are available but provides a count of zero. This could confuse downstream consumers expecting both values to align.

Consider returning (False, -1) or (True, 1) to better represent the uncertainty, or document the specific meaning of this tuple.

🧰 Tools
🪛 Ruff (0.14.2)

36-36: Consider moving this statement to an else block

(TRY300)


37-37: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In ami/jobs/status.py around lines 21 to 40, the exception branch currently
returns (True, 0) which is semantically inconsistent (claims workers available
but count zero); change the fail-open policy to a consistent tuple such as
(False, -1) to signal "unknown/unavailable" count, update the function docstring
to document the meaning of the (-1) count on error, and keep the warning log;
ensure callers handle the -1 sentinel appropriately.

Comment on lines +183 to +214
def _check_status_mismatch(job, celery_status: str, now: datetime.datetime) -> bool:
"""
Check if job status doesn't match Celery task status and reconcile.
Args:
job: The Job instance
celery_status: The status reported by Celery
now: Current datetime
Returns:
True if status changed, False otherwise
"""
from ami.jobs.models import JobState

if celery_status == job.status:
return False

job.logger.warning(
f"Job status '{job.status}' doesn't match Celery task status '{celery_status}'. " f"Updating to match Celery."
)

old_status = job.status
job.update_status(celery_status, save=False)

# If Celery says it's in a final state but we thought it was running
if celery_status in JobState.final_states() and old_status in JobState.running_states():
job.finished_at = now
if celery_status in JobState.failed_states():
job.logger.error(f"Task failed in Celery")

return True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: This status mismatch check is never invoked!

The _check_status_mismatch() function is defined but never called in the main check_job_status() function. This means status discrepancies between the Job model and Celery tasks won't be reconciled, which defeats a core purpose of this monitoring system.

Additionally, line 211 has an unnecessary f-string prefix.

Apply this diff to invoke the status mismatch check and fix the f-string:

+    # Check 5: Status Mismatch
+    if _check_status_mismatch(job, celery_status, now):
+        _save_job(job, status_changed=True, save=save)
+        return True
+
     # Check 4: Stuck pending

And fix the f-string:

-            job.logger.error(f"Task failed in Celery")
+            job.logger.error("Task failed in Celery")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _check_status_mismatch(job, celery_status: str, now: datetime.datetime) -> bool:
"""
Check if job status doesn't match Celery task status and reconcile.
Args:
job: The Job instance
celery_status: The status reported by Celery
now: Current datetime
Returns:
True if status changed, False otherwise
"""
from ami.jobs.models import JobState
if celery_status == job.status:
return False
job.logger.warning(
f"Job status '{job.status}' doesn't match Celery task status '{celery_status}'. " f"Updating to match Celery."
)
old_status = job.status
job.update_status(celery_status, save=False)
# If Celery says it's in a final state but we thought it was running
if celery_status in JobState.final_states() and old_status in JobState.running_states():
job.finished_at = now
if celery_status in JobState.failed_states():
job.logger.error(f"Task failed in Celery")
return True
def _check_status_mismatch(job, celery_status: str, now: datetime.datetime) -> bool:
"""
Check if job status doesn't match Celery task status and reconcile.
Args:
job: The Job instance
celery_status: The status reported by Celery
now: Current datetime
Returns:
True if status changed, False otherwise
"""
from ami.jobs.models import JobState
if celery_status == job.status:
return False
job.logger.warning(
f"Job status '{job.status}' doesn't match Celery task status '{celery_status}'. " f"Updating to match Celery."
)
old_status = job.status
job.update_status(celery_status, save=False)
# If Celery says it's in a final state but we thought it was running
if celery_status in JobState.final_states() and old_status in JobState.running_states():
job.finished_at = now
if celery_status in JobState.failed_states():
job.logger.error("Task failed in Celery")
return True
🧰 Tools
🪛 Ruff (0.14.2)

211-211: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
In ami/jobs/status.py around lines 183 to 214, the helper
_check_status_mismatch(...) is implemented but never invoked from
check_job_status(), so reconcile logic is skipped; modify check_job_status() to
call _check_status_mismatch(job, celery_status, now) at the point where Celery
task status is known (after obtaining celery_status and before deciding further
updates) and act on its boolean return (skip duplicate updates if it already
reconciled); also fix the unnecessary f-string prefix on line 211 by removing
the extra f before the string literal so it’s a normal string interpolation
expression.

Comment on lines +346 to +424
def check_job_status(job, force: bool = False, save: bool = True) -> bool:
"""
Check if the job's Celery task still exists and update status accordingly.
This function handles multiple scenarios in order:
1. Resurrection - Job marked failed but task is actually running
2. Missing task_id - Job scheduled but never got a task ID
3. Stale job - Job running longer than allowed
4. Disappeared task - Task existed but is now gone from Celery
5. Stuck pending - Job waiting too long with/without workers
Args:
job: The Job instance to check
force: Skip the recent check time limit and check final states
save: Save the job if status changes
Returns:
bool: True if job status was changed, False otherwise
"""
from ami.jobs.models import JobState

now = timezone.now()

# Skip if checked recently (unless forced)
if _should_skip_check(job, force, now):
job.last_checked_at = now
_save_job(job, status_changed=False, save=save)
return False

job.last_checked_at = now

# Check 0: Resurrection (failed jobs that came back to life)
if not force and _check_if_resurrected(job, now):
_save_job(job, status_changed=True, save=save)
return True

# Skip final states unless forced
if not force and job.status in JobState.final_states():
_save_job(job, status_changed=False, save=save)
return False

# Check 1: Missing Task ID
if _check_missing_task_id(job, now):
_save_job(job, status_changed=True, save=save)
return True

if not job.task_id:
_save_job(job, status_changed=False, save=save)
return False

# Get Celery task and status
task, celery_status = _get_celery_task_status(job)

# Check 2: Stale Job
if _check_if_stale(job, task, now):
_save_job(job, status_changed=True, save=save)
return True

# Check 3: Disappeared Task (only for non-final states)
if job.status not in JobState.final_states() and _check_disappeared_task(job, celery_status, now):
_save_job(job, status_changed=True, save=save)
return True

# No celery status available - can't check further
if not celery_status:
_save_job(job, status_changed=False, save=save)
return False

# Skip PENDING status for final states (task just doesn't exist anymore)
if job.status in JobState.final_states() and celery_status == JobState.PENDING:
_save_job(job, status_changed=False, save=save)
return False

# Check 4: Stuck pending
if job.status not in JobState.final_states():
status_changed = _check_stuck_pending(job, celery_status, now)

_save_job(job, status_changed, save=save)
return status_changed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Uninitialized variable status_changed on line 421.

Lines 420-424 have a critical bug: if job.status in JobState.final_states(), the variable status_changed is never initialized before being used on lines 423-424, causing an UnboundLocalError.

Apply this diff to initialize the variable:

     # Check 4: Stuck pending
+    status_changed = False
     if job.status not in JobState.final_states():
         status_changed = _check_stuck_pending(job, celery_status, now)

Additionally, this function should invoke _check_status_mismatch() as noted in my earlier comment.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def check_job_status(job, force: bool = False, save: bool = True) -> bool:
"""
Check if the job's Celery task still exists and update status accordingly.
This function handles multiple scenarios in order:
1. Resurrection - Job marked failed but task is actually running
2. Missing task_id - Job scheduled but never got a task ID
3. Stale job - Job running longer than allowed
4. Disappeared task - Task existed but is now gone from Celery
5. Stuck pending - Job waiting too long with/without workers
Args:
job: The Job instance to check
force: Skip the recent check time limit and check final states
save: Save the job if status changes
Returns:
bool: True if job status was changed, False otherwise
"""
from ami.jobs.models import JobState
now = timezone.now()
# Skip if checked recently (unless forced)
if _should_skip_check(job, force, now):
job.last_checked_at = now
_save_job(job, status_changed=False, save=save)
return False
job.last_checked_at = now
# Check 0: Resurrection (failed jobs that came back to life)
if not force and _check_if_resurrected(job, now):
_save_job(job, status_changed=True, save=save)
return True
# Skip final states unless forced
if not force and job.status in JobState.final_states():
_save_job(job, status_changed=False, save=save)
return False
# Check 1: Missing Task ID
if _check_missing_task_id(job, now):
_save_job(job, status_changed=True, save=save)
return True
if not job.task_id:
_save_job(job, status_changed=False, save=save)
return False
# Get Celery task and status
task, celery_status = _get_celery_task_status(job)
# Check 2: Stale Job
if _check_if_stale(job, task, now):
_save_job(job, status_changed=True, save=save)
return True
# Check 3: Disappeared Task (only for non-final states)
if job.status not in JobState.final_states() and _check_disappeared_task(job, celery_status, now):
_save_job(job, status_changed=True, save=save)
return True
# No celery status available - can't check further
if not celery_status:
_save_job(job, status_changed=False, save=save)
return False
# Skip PENDING status for final states (task just doesn't exist anymore)
if job.status in JobState.final_states() and celery_status == JobState.PENDING:
_save_job(job, status_changed=False, save=save)
return False
# Check 4: Stuck pending
if job.status not in JobState.final_states():
status_changed = _check_stuck_pending(job, celery_status, now)
_save_job(job, status_changed, save=save)
return status_changed
def check_job_status(job, force: bool = False, save: bool = True) -> bool:
"""
Check if the job's Celery task still exists and update status accordingly.
This function handles multiple scenarios in order:
1. Resurrection - Job marked failed but task is actually running
2. Missing task_id - Job scheduled but never got a task ID
3. Stale job - Job running longer than allowed
4. Disappeared task - Task existed but is now gone from Celery
5. Stuck pending - Job waiting too long with/without workers
Args:
job: The Job instance to check
force: Skip the recent check time limit and check final states
save: Save the job if status changes
Returns:
bool: True if job status was changed, False otherwise
"""
from ami.jobs.models import JobState
now = timezone.now()
# Skip if checked recently (unless forced)
if _should_skip_check(job, force, now):
job.last_checked_at = now
_save_job(job, status_changed=False, save=save)
return False
job.last_checked_at = now
# Check 0: Resurrection (failed jobs that came back to life)
if not force and _check_if_resurrected(job, now):
_save_job(job, status_changed=True, save=save)
return True
# Skip final states unless forced
if not force and job.status in JobState.final_states():
_save_job(job, status_changed=False, save=save)
return False
# Check 1: Missing Task ID
if _check_missing_task_id(job, now):
_save_job(job, status_changed=True, save=save)
return True
if not job.task_id:
_save_job(job, status_changed=False, save=save)
return False
# Get Celery task and status
task, celery_status = _get_celery_task_status(job)
# Check 2: Stale Job
if _check_if_stale(job, task, now):
_save_job(job, status_changed=True, save=save)
return True
# Check 3: Disappeared Task (only for non-final states)
if job.status not in JobState.final_states() and _check_disappeared_task(job, celery_status, now):
_save_job(job, status_changed=True, save=save)
return True
# No celery status available - can't check further
if not celery_status:
_save_job(job, status_changed=False, save=save)
return False
# Skip PENDING status for final states (task just doesn't exist anymore)
if job.status in JobState.final_states() and celery_status == JobState.PENDING:
_save_job(job, status_changed=False, save=save)
return False
# Check 4: Stuck pending
status_changed = False
if job.status not in JobState.final_states():
status_changed = _check_stuck_pending(job, celery_status, now)
_save_job(job, status_changed, save=save)
return status_changed
🤖 Prompt for AI Agents
In ami/jobs/status.py around lines 346 to 424, initialize the local variable
status_changed before it's used and call _check_status_mismatch() so mismatched
states are detected: add status_changed = False prior to the "if job.status not
in JobState.final_states()" block (so it's always defined), keep the existing
assignment when non-final and calling _check_stuck_pending, and after obtaining
celery_status (or after computing status_changed) call
_check_status_mismatch(job, celery_status, now) and if it returns True set
status_changed = True before the final _save_job call.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
ami/jobs/tests.py (1)

219-310: Consider extracting repeated imports and timeout constants.

Multiple test methods import timedelta and timezone at the method level, and use hardcoded timeout values (8 days, 10 minutes, etc.). These could be extracted to reduce duplication.

Apply this pattern at the class level:

 class TestJobStatusChecking(TestCase):
     """
     Test the job status checking functionality.
     """
+    
+    # Test timeout constants
+    MAX_RUNTIME_DAYS = 8
+    OLD_SCHEDULED_MINUTES = 10
+    RECENT_SCHEDULED_MINUTES = 1
 
     def setUp(self):
+        from datetime import timedelta
+        from django.utils import timezone
+        
+        self.timedelta = timedelta
+        self.timezone = timezone
+        
         self.project = Project.objects.create(name="Status Check Test Project")

Then use self.timezone.now() - self.timedelta(days=self.MAX_RUNTIME_DAYS) in tests.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a090fd and 100bb05.

📒 Files selected for processing (1)
  • ami/jobs/tests.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/tests.py (3)
ami/jobs/models.py (7)
  • Job (719-1037)
  • JobState (27-63)
  • check_status (968-983)
  • save (955-966)
  • enqueue (812-829)
  • retry (869-881)
  • cancel (883-896)
ami/jobs/tasks.py (1)
  • check_incomplete_jobs (74-158)
ami/jobs/status.py (2)
  • check_celery_workers_available (21-39)
  • check_celery_workers_available_cached (43-55)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Redirect rules
  • GitHub Check: Header rules
  • GitHub Check: Pages changed
  • GitHub Check: test
🔇 Additional comments (4)
ami/jobs/tests.py (4)

460-461: Good practice: Proper cache cleanup in setUp/tearDown.

The test properly manages the cache lock in both setUp and tearDown, ensuring test isolation and preventing lock-related flakiness.

Also applies to: 473-474


476-513: Comprehensive test coverage for periodic task.

The test effectively validates multiple scenarios:

  • Job state transitions (STARTED → FAILURE, PENDING → FAILURE)
  • Filtering of completed jobs (SUCCESS state)
  • Result reporting with accurate counts

The test structure clearly demonstrates the expected behavior of the periodic task.


379-449: Thorough validation of status synchronization.

These tests comprehensively verify that status remains synchronized between job.status and job.progress.summary.status across key lifecycle operations (enqueue, retry, cancel, status checker). This addresses a critical consistency requirement for the job monitoring system.


560-561: No action required—decorator is present.

The verification confirms that check_celery_workers_available_cached in ami/jobs/status.py is properly decorated with @functools.lru_cache(maxsize=1). The cache_clear() call on line 561 is valid and will function correctly.

self.assertEqual(job.status, JobState.FAILURE)
self.assertIsNotNone(job.last_checked_at)
# Check that error was logged
self.assertTrue(any("never got a task_id" in msg for msg in job.logs.stderr))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Log message validation may be brittle.

Tests validate error messages by searching for specific strings in job logs. If the exact wording changes in the implementation, these tests will break.

Consider one of these approaches:

  1. Define expected log patterns as constants in the status checking module and import them in tests
  2. Use more flexible pattern matching (e.g., regex)
  3. Check for structured log attributes instead of string matching if available

Example:

# In ami/jobs/status.py
MISSING_TASK_ID_ERROR = "never got a task_id"
DISAPPEARED_TASK_ERROR = "disappeared from Celery"

# In tests
from ami.jobs.status import MISSING_TASK_ID_ERROR
self.assertTrue(any(MISSING_TASK_ID_ERROR in msg for msg in job.logs.stderr))

Also applies to: 282-282, 306-310

🤖 Prompt for AI Agents
In ami/jobs/tests.py around lines 240, 282 and 306-310, the tests assert on
literal substrings from job.logs.stderr which is brittle; instead add canonical
log message constants (e.g., MISSING_TASK_ID_ERROR, DISAPPEARED_TASK_ERROR) into
ami/jobs/status.py and import those constants into the tests, or replace the
literal checks with configurable regex patterns imported from a central module;
update each assertion to reference the imported constant or regex and use a
single helper (e.g., any(constant in msg for msg in job.logs.stderr) or
regex.search) so tests remain stable when wording changes.

self.assertFalse(status_changed)

# With force, should check
job.started_at = timezone.now() - timedelta(days=8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Direct attribute modification might bypass model logic.

Line 354 directly modifies job.started_at without calling save() before the next check_status() call. This works in this test but could be misleading if model save triggers any side effects.

Consider explicitly saving after modification for clarity:

         # With force, should check
         job.started_at = timezone.now() - timedelta(days=8)
+        job.save()
         status_changed = job.check_status(force=True, save=True)
🤖 Prompt for AI Agents
In ami/jobs/tests.py around line 354, the test directly assigns job.started_at =
timezone.now() - timedelta(days=8) which mutates the model instance without
persisting potential model-side logic; update the test to persist the change by
calling job.save() (or job.save(update_fields=['started_at'])) immediately after
setting started_at so subsequent check_status() operates on a saved state, or
alternatively set the field via the ORM (e.g.,
Model.objects.filter(pk=job.pk).update(...)) and then refresh the instance with
job.refresh_from_db() before calling check_status().

Comment on lines +512 to +513
self.assertGreaterEqual(int(result["checked"]), 2) # Should check jobs 1 and 3
self.assertGreaterEqual(int(result["updated"]), 2) # Should update both to FAILURE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Type conversion in assertions suggests return type inconsistency.

The assertions use int(result["checked"]) and int(result["updated"]), suggesting these values might not be integers. However, looking at the check_incomplete_jobs function signature (line 73 in tasks.py), it returns dict[str, int | str], which should guarantee integer values for these keys.

Verify the return type consistency and remove unnecessary conversions:

-        self.assertGreaterEqual(int(result["checked"]), 2)  # Should check jobs 1 and 3
-        self.assertGreaterEqual(int(result["updated"]), 2)  # Should update both to FAILURE
+        self.assertGreaterEqual(result["checked"], 2)  # Should check jobs 1 and 3
+        self.assertGreaterEqual(result["updated"], 2)  # Should update both to FAILURE

If the conversion is necessary due to actual runtime behavior, this indicates a type mismatch between the signature and implementation in check_incomplete_jobs that should be fixed.

🤖 Prompt for AI Agents
In ami/jobs/tests.py around lines 512 to 513, the test is coercing
result["checked"] and result["updated"] to int(), which masks a mismatch between
the function signature and runtime values; inspect ami/jobs/tasks.py
(check_incomplete_jobs implementation) and ensure it returns integers for
"checked" and "updated" (update any places building the result to use int values
and adjust the function's return typing if necessary), then remove the
unnecessary int(...) conversions from the test assertions so they assert
directly on integer values and update/add a unit test or type check to prevent
regressions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants