-
Notifications
You must be signed in to change notification settings - Fork 11
Add periodic status check for incomplete jobs #1025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for antenna-preview ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughA new job status monitoring system is introduced, including periodic task checking, admin retry/cancel actions, status reconciliation logic with timeout thresholds, and enhanced admin filtering. A new Changes
Sequence Diagram(s)sequenceDiagram
participant Scheduler as Celery Beat Scheduler
participant Task as check_incomplete_jobs Task
participant Cache
participant Job
participant StatusModule as status.check_job_status()
participant Celery as Celery AsyncResult
Scheduler->>Task: Execute periodic task (every 3 min)
Task->>Cache: Attempt to acquire lock
alt Lock acquired
Task->>Job: Query incomplete jobs (capped at 100)
Note over Job: Filter by last_checked_at > now - MIN_CHECK_INTERVAL
loop For each job
Task->>StatusModule: check_status(force=False, save=True)
alt Worker available
StatusModule->>Celery: Get task status via AsyncResult
Celery-->>StatusModule: Task state (PENDING/SUCCESS/FAILURE/etc)
StatusModule->>Job: Reconcile & update status if changed
StatusModule->>Job: Update last_checked_at
Note over Job: Log decision (resurrection, timeout, etc)
else No worker available
Note over StatusModule: Monitor stuck PENDING with extended timeout
end
StatusModule-->>Task: Return changed (bool)
Task->>Task: Increment counters (checked/updated/errors)
end
Task-->>Scheduler: Return summary {status, totals, counts}
else Lock held
Task-->>Scheduler: Skip execution, return early
end
Task->>Cache: Release lock (finally)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…minology - Rename ami/jobs/status_checker.py → ami/jobs/status.py - Replace 'unfinished jobs' with 'incomplete jobs' throughout - Update migration file names and task names - Update test class names - Fix SourceImageCollectionAdmin missing search_fields - Add git commit guidelines to CLAUDE.md This makes terminology more accurate and sets up clean history for subsequent feature additions. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces a comprehensive job status monitoring system to track and manage Celery jobs that may become stuck or lost. The implementation adds periodic checking for incomplete jobs and handles various failure scenarios like missing task IDs, disappeared tasks, and exceeded runtimes.
Key Changes
- Adds a new
check_status()method to the Job model with supporting constants for timeout configurations - Implements a periodic task
check_incomplete_jobsthat runs every 3 minutes to monitor job health - Creates a new
status.pymodule with detailed status checking logic for various failure scenarios - Enhances the Job admin interface with better filtering, search capabilities, and bulk actions
- Ensures job status stays synchronized with progress tracking across status update operations
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| ami/jobs/models.py | Adds status checking constants, last_checked_at field, check_status() method, and updates status management to keep job and progress in sync |
| ami/jobs/status.py | New module implementing comprehensive status checking logic with helpers for detecting stuck/disappeared jobs |
| ami/jobs/tasks.py | Adds periodic task check_incomplete_jobs with locking to check incomplete jobs every few minutes |
| ami/jobs/tests.py | Comprehensive test coverage for status checking scenarios and periodic task behavior |
| ami/jobs/admin.py | Improves admin interface with retry/cancel actions, better list display ordering, filters, and autocomplete |
| ami/jobs/migrations/0019_job_last_checked_at.py | Database migration to add the last_checked_at timestamp field |
| ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py | Creates the periodic task for automated job monitoring |
| ami/main/admin.py | Adds search functionality for SourceImageCollection name field |
| .pre-commit-config.yaml | Ensures flake8 uses project configuration by explicitly pointing to setup.cfg |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes | ||
| MAX_JOB_RUNTIME_SECONDS = 7 * 24 * 60 * 60 # 7 days | ||
| STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes | ||
|
|
Copilot
AI
Nov 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing required constants STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS and PENDING_LOG_INTERVAL_SECONDS which are referenced in ami/jobs/status.py at lines 281, 284, 286, and 287. These constants should be defined in the Job model alongside other timeout constants.
| STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 600 # 10 minutes | |
| PENDING_LOG_INTERVAL_SECONDS = 60 # 1 minute |
| self.message_user(request, f"Cancelled {queryset.count()} job(s).") | ||
|
|
||
| actions = [enqueue_jobs] | ||
| actions = [enqueue_jobs, retry_jobs] |
Copilot
AI
Nov 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancel_jobs action is defined at line 39 but not included in the actions list. Either remove the unused action method or add it to the actions list: actions = [enqueue_jobs, retry_jobs, cancel_jobs]
| actions = [enqueue_jobs, retry_jobs] | |
| actions = [enqueue_jobs, retry_jobs, cancel_jobs] |
| _save_job(job, status_changed=False, save=save) | ||
| return False | ||
|
|
||
| # Check 4: Stuck pending |
Copilot
AI
Nov 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable status_changed is only assigned within the conditional block but is used unconditionally at line 423. If the condition is false, status_changed will be undefined, causing a NameError. Initialize status_changed = False before the conditional block.
| # Check 4: Stuck pending | |
| # Check 4: Stuck pending | |
| status_changed = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
ami/jobs/status.py (1)
216-255: Uselogging.exceptionfor better error diagnostics.Line 252 should use
job.logger.exception()instead ofjob.logger.error()to automatically include the exception traceback in the log output.Apply this diff:
- job.logger.error(f"Failed to revoke stale task {job.task_id}: {e}") + job.logger.exception(f"Failed to revoke stale task {job.task_id}: {e}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
.pre-commit-config.yaml(1 hunks)ami/jobs/admin.py(4 hunks)ami/jobs/migrations/0019_job_last_checked_at.py(1 hunks)ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py(1 hunks)ami/jobs/models.py(7 hunks)ami/jobs/status.py(1 hunks)ami/jobs/tasks.py(2 hunks)ami/jobs/tests.py(1 hunks)ami/main/admin.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ami/jobs/tasks.py (1)
ami/jobs/models.py (6)
logger(1022-1031)Job(719-1037)JobState(27-63)running_states(54-55)check_status(968-983)save(955-966)
ami/jobs/tests.py (3)
ami/jobs/models.py (8)
Job(719-1037)MLJob(316-513)JobState(27-63)check_status(968-983)save(955-966)enqueue(812-829)retry(869-881)cancel(883-896)ami/jobs/tasks.py (1)
check_incomplete_jobs(74-158)ami/jobs/status.py (2)
check_celery_workers_available(21-39)check_celery_workers_available_cached(43-55)
ami/jobs/status.py (1)
ami/jobs/models.py (8)
logger(1022-1031)JobState(27-63)update_status(898-918)save(955-966)running_states(54-55)final_states(58-59)failed_states(62-63)update_progress(920-948)
ami/jobs/admin.py (2)
ami/jobs/models.py (3)
Job(719-1037)retry(869-881)cancel(883-896)ami/jobs/views.py (2)
retry(98-109)cancel(112-119)
ami/jobs/models.py (2)
ami/jobs/tasks.py (1)
run_job(18-35)ami/jobs/status.py (1)
check_job_status(346-424)
🪛 Ruff (0.14.2)
ami/jobs/migrations/0020_create_check_incomplete_jobs_periodic_task.py
6-6: Unused function argument: apps
(ARG001)
6-6: Unused function argument: schema_editor
(ARG001)
25-25: Do not catch blind exception: Exception
(BLE001)
30-30: Unused function argument: apps
(ARG001)
30-30: Unused function argument: schema_editor
(ARG001)
36-36: Do not catch blind exception: Exception
(BLE001)
41-43: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
45-47: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/status.py
36-36: Consider moving this statement to an else block
(TRY300)
37-37: Do not catch blind exception: Exception
(BLE001)
43-43: Unused function argument: timestamp
(ARG001)
81-81: Unused function argument: now
(ARG001)
110-110: Do not catch blind exception: Exception
(BLE001)
211-211: f-string without any placeholders
Remove extraneous f prefix
(F541)
251-251: Do not catch blind exception: Exception
(BLE001)
252-252: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
340-340: Consider moving this statement to an else block
(TRY300)
341-341: Do not catch blind exception: Exception
(BLE001)
ami/jobs/admin.py
44-44: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
ami/jobs/migrations/0019_job_last_checked_at.py
7-9: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
11-19: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
🔇 Additional comments (10)
ami/jobs/status.py (10)
1-19: LGTM!Module setup, imports, and documentation are well-structured.
42-56: LGTM! Thetimestampparameter is used correctly.The static analysis warning about the unused
timestampparameter is a false positive. The parameter serves as the cache key forlru_cache, enabling minute-based cache expiration—a smart pattern for this use case.
58-79: LGTM!The helper function correctly marks jobs as failed with proper idempotency and timestamp handling.
81-114: Review thefinished_athandling for SUCCESS resurrection.Line 107 preserves the existing
finished_atwhen a resurrected job is in SUCCESS state. However, iffinished_atwas never set (which is possible for a resurrected job), it should probably be set tonowfor SUCCESS tasks.Consider:
- job.finished_at = None if celery_status == JobState.STARTED else job.finished_at + if celery_status == JobState.STARTED: + job.finished_at = None + elif celery_status == JobState.SUCCESS and not job.finished_at: + job.finished_at = now
116-140: LGTM!The missing task ID check correctly identifies jobs that were scheduled but never received a task ID within the timeout.
142-181: LGTM!The disappeared task detection logic is thorough and includes helpful diagnostics for potential worker crashes.
257-316: LGTM!The stuck pending detection logic is well-designed with adaptive timeouts based on worker availability and thoughtful periodic logging to aid troubleshooting.
318-325: LGTM!The skip check logic appropriately throttles status checks to avoid excessive queries.
327-334: LGTM!The save strategy efficiently updates only the necessary fields based on whether status changed.
336-344: LGTM!The Celery task status query properly handles errors and returns a consistent tuple format.
| actions = [enqueue_jobs, retry_jobs] | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add cancel_jobs to the actions list
cancel_jobs is defined but never registered in actions, so the new admin action will not appear in the UI. Please include it to make the cancellation action usable.
- actions = [enqueue_jobs, retry_jobs]
+ actions = [enqueue_jobs, retry_jobs, cancel_jobs]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| actions = [enqueue_jobs, retry_jobs] | |
| actions = [enqueue_jobs, retry_jobs, cancel_jobs] |
🧰 Tools
🪛 Ruff (0.14.2)
44-44: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
🤖 Prompt for AI Agents
In ami/jobs/admin.py around lines 44-45, the admin action list currently
contains only [enqueue_jobs, retry_jobs]; add cancel_jobs to that list so the
cancel_jobs action is registered and appears in the admin UI (e.g., change
actions = [enqueue_jobs, retry_jobs] to actions = [enqueue_jobs, retry_jobs,
cancel_jobs]). Ensure no trailing commas or syntax errors are introduced.
| def check_celery_workers_available() -> tuple[bool, int]: | ||
| """ | ||
| Check if any Celery workers are currently running. | ||
| Returns: | ||
| tuple: (workers_available: bool, worker_count: int) | ||
| """ | ||
| try: | ||
| inspect = celery_app.control.inspect() | ||
| active_workers = inspect.active() | ||
|
|
||
| if active_workers is None: | ||
| return False, 0 | ||
|
|
||
| worker_count = len(active_workers) | ||
| return worker_count > 0, worker_count | ||
| except Exception as e: | ||
| logger.warning(f"Failed to check for Celery workers: {e}") | ||
| return True, 0 # Fail open - assume workers might be available | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider a more consistent fail-open return value.
Line 39 returns (True, 0) on exception, which is semantically inconsistent—it indicates workers are available but provides a count of zero. This could confuse downstream consumers expecting both values to align.
Consider returning (False, -1) or (True, 1) to better represent the uncertainty, or document the specific meaning of this tuple.
🧰 Tools
🪛 Ruff (0.14.2)
36-36: Consider moving this statement to an else block
(TRY300)
37-37: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In ami/jobs/status.py around lines 21 to 40, the exception branch currently
returns (True, 0) which is semantically inconsistent (claims workers available
but count zero); change the fail-open policy to a consistent tuple such as
(False, -1) to signal "unknown/unavailable" count, update the function docstring
to document the meaning of the (-1) count on error, and keep the warning log;
ensure callers handle the -1 sentinel appropriately.
| def _check_status_mismatch(job, celery_status: str, now: datetime.datetime) -> bool: | ||
| """ | ||
| Check if job status doesn't match Celery task status and reconcile. | ||
| Args: | ||
| job: The Job instance | ||
| celery_status: The status reported by Celery | ||
| now: Current datetime | ||
| Returns: | ||
| True if status changed, False otherwise | ||
| """ | ||
| from ami.jobs.models import JobState | ||
|
|
||
| if celery_status == job.status: | ||
| return False | ||
|
|
||
| job.logger.warning( | ||
| f"Job status '{job.status}' doesn't match Celery task status '{celery_status}'. " f"Updating to match Celery." | ||
| ) | ||
|
|
||
| old_status = job.status | ||
| job.update_status(celery_status, save=False) | ||
|
|
||
| # If Celery says it's in a final state but we thought it was running | ||
| if celery_status in JobState.final_states() and old_status in JobState.running_states(): | ||
| job.finished_at = now | ||
| if celery_status in JobState.failed_states(): | ||
| job.logger.error(f"Task failed in Celery") | ||
|
|
||
| return True | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: This status mismatch check is never invoked!
The _check_status_mismatch() function is defined but never called in the main check_job_status() function. This means status discrepancies between the Job model and Celery tasks won't be reconciled, which defeats a core purpose of this monitoring system.
Additionally, line 211 has an unnecessary f-string prefix.
Apply this diff to invoke the status mismatch check and fix the f-string:
+ # Check 5: Status Mismatch
+ if _check_status_mismatch(job, celery_status, now):
+ _save_job(job, status_changed=True, save=save)
+ return True
+
# Check 4: Stuck pendingAnd fix the f-string:
- job.logger.error(f"Task failed in Celery")
+ job.logger.error("Task failed in Celery")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _check_status_mismatch(job, celery_status: str, now: datetime.datetime) -> bool: | |
| """ | |
| Check if job status doesn't match Celery task status and reconcile. | |
| Args: | |
| job: The Job instance | |
| celery_status: The status reported by Celery | |
| now: Current datetime | |
| Returns: | |
| True if status changed, False otherwise | |
| """ | |
| from ami.jobs.models import JobState | |
| if celery_status == job.status: | |
| return False | |
| job.logger.warning( | |
| f"Job status '{job.status}' doesn't match Celery task status '{celery_status}'. " f"Updating to match Celery." | |
| ) | |
| old_status = job.status | |
| job.update_status(celery_status, save=False) | |
| # If Celery says it's in a final state but we thought it was running | |
| if celery_status in JobState.final_states() and old_status in JobState.running_states(): | |
| job.finished_at = now | |
| if celery_status in JobState.failed_states(): | |
| job.logger.error(f"Task failed in Celery") | |
| return True | |
| def _check_status_mismatch(job, celery_status: str, now: datetime.datetime) -> bool: | |
| """ | |
| Check if job status doesn't match Celery task status and reconcile. | |
| Args: | |
| job: The Job instance | |
| celery_status: The status reported by Celery | |
| now: Current datetime | |
| Returns: | |
| True if status changed, False otherwise | |
| """ | |
| from ami.jobs.models import JobState | |
| if celery_status == job.status: | |
| return False | |
| job.logger.warning( | |
| f"Job status '{job.status}' doesn't match Celery task status '{celery_status}'. " f"Updating to match Celery." | |
| ) | |
| old_status = job.status | |
| job.update_status(celery_status, save=False) | |
| # If Celery says it's in a final state but we thought it was running | |
| if celery_status in JobState.final_states() and old_status in JobState.running_states(): | |
| job.finished_at = now | |
| if celery_status in JobState.failed_states(): | |
| job.logger.error("Task failed in Celery") | |
| return True |
🧰 Tools
🪛 Ruff (0.14.2)
211-211: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
In ami/jobs/status.py around lines 183 to 214, the helper
_check_status_mismatch(...) is implemented but never invoked from
check_job_status(), so reconcile logic is skipped; modify check_job_status() to
call _check_status_mismatch(job, celery_status, now) at the point where Celery
task status is known (after obtaining celery_status and before deciding further
updates) and act on its boolean return (skip duplicate updates if it already
reconciled); also fix the unnecessary f-string prefix on line 211 by removing
the extra f before the string literal so it’s a normal string interpolation
expression.
| def check_job_status(job, force: bool = False, save: bool = True) -> bool: | ||
| """ | ||
| Check if the job's Celery task still exists and update status accordingly. | ||
| This function handles multiple scenarios in order: | ||
| 1. Resurrection - Job marked failed but task is actually running | ||
| 2. Missing task_id - Job scheduled but never got a task ID | ||
| 3. Stale job - Job running longer than allowed | ||
| 4. Disappeared task - Task existed but is now gone from Celery | ||
| 5. Stuck pending - Job waiting too long with/without workers | ||
| Args: | ||
| job: The Job instance to check | ||
| force: Skip the recent check time limit and check final states | ||
| save: Save the job if status changes | ||
| Returns: | ||
| bool: True if job status was changed, False otherwise | ||
| """ | ||
| from ami.jobs.models import JobState | ||
|
|
||
| now = timezone.now() | ||
|
|
||
| # Skip if checked recently (unless forced) | ||
| if _should_skip_check(job, force, now): | ||
| job.last_checked_at = now | ||
| _save_job(job, status_changed=False, save=save) | ||
| return False | ||
|
|
||
| job.last_checked_at = now | ||
|
|
||
| # Check 0: Resurrection (failed jobs that came back to life) | ||
| if not force and _check_if_resurrected(job, now): | ||
| _save_job(job, status_changed=True, save=save) | ||
| return True | ||
|
|
||
| # Skip final states unless forced | ||
| if not force and job.status in JobState.final_states(): | ||
| _save_job(job, status_changed=False, save=save) | ||
| return False | ||
|
|
||
| # Check 1: Missing Task ID | ||
| if _check_missing_task_id(job, now): | ||
| _save_job(job, status_changed=True, save=save) | ||
| return True | ||
|
|
||
| if not job.task_id: | ||
| _save_job(job, status_changed=False, save=save) | ||
| return False | ||
|
|
||
| # Get Celery task and status | ||
| task, celery_status = _get_celery_task_status(job) | ||
|
|
||
| # Check 2: Stale Job | ||
| if _check_if_stale(job, task, now): | ||
| _save_job(job, status_changed=True, save=save) | ||
| return True | ||
|
|
||
| # Check 3: Disappeared Task (only for non-final states) | ||
| if job.status not in JobState.final_states() and _check_disappeared_task(job, celery_status, now): | ||
| _save_job(job, status_changed=True, save=save) | ||
| return True | ||
|
|
||
| # No celery status available - can't check further | ||
| if not celery_status: | ||
| _save_job(job, status_changed=False, save=save) | ||
| return False | ||
|
|
||
| # Skip PENDING status for final states (task just doesn't exist anymore) | ||
| if job.status in JobState.final_states() and celery_status == JobState.PENDING: | ||
| _save_job(job, status_changed=False, save=save) | ||
| return False | ||
|
|
||
| # Check 4: Stuck pending | ||
| if job.status not in JobState.final_states(): | ||
| status_changed = _check_stuck_pending(job, celery_status, now) | ||
|
|
||
| _save_job(job, status_changed, save=save) | ||
| return status_changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Uninitialized variable status_changed on line 421.
Lines 420-424 have a critical bug: if job.status in JobState.final_states(), the variable status_changed is never initialized before being used on lines 423-424, causing an UnboundLocalError.
Apply this diff to initialize the variable:
# Check 4: Stuck pending
+ status_changed = False
if job.status not in JobState.final_states():
status_changed = _check_stuck_pending(job, celery_status, now)Additionally, this function should invoke _check_status_mismatch() as noted in my earlier comment.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def check_job_status(job, force: bool = False, save: bool = True) -> bool: | |
| """ | |
| Check if the job's Celery task still exists and update status accordingly. | |
| This function handles multiple scenarios in order: | |
| 1. Resurrection - Job marked failed but task is actually running | |
| 2. Missing task_id - Job scheduled but never got a task ID | |
| 3. Stale job - Job running longer than allowed | |
| 4. Disappeared task - Task existed but is now gone from Celery | |
| 5. Stuck pending - Job waiting too long with/without workers | |
| Args: | |
| job: The Job instance to check | |
| force: Skip the recent check time limit and check final states | |
| save: Save the job if status changes | |
| Returns: | |
| bool: True if job status was changed, False otherwise | |
| """ | |
| from ami.jobs.models import JobState | |
| now = timezone.now() | |
| # Skip if checked recently (unless forced) | |
| if _should_skip_check(job, force, now): | |
| job.last_checked_at = now | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| job.last_checked_at = now | |
| # Check 0: Resurrection (failed jobs that came back to life) | |
| if not force and _check_if_resurrected(job, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| # Skip final states unless forced | |
| if not force and job.status in JobState.final_states(): | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Check 1: Missing Task ID | |
| if _check_missing_task_id(job, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| if not job.task_id: | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Get Celery task and status | |
| task, celery_status = _get_celery_task_status(job) | |
| # Check 2: Stale Job | |
| if _check_if_stale(job, task, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| # Check 3: Disappeared Task (only for non-final states) | |
| if job.status not in JobState.final_states() and _check_disappeared_task(job, celery_status, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| # No celery status available - can't check further | |
| if not celery_status: | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Skip PENDING status for final states (task just doesn't exist anymore) | |
| if job.status in JobState.final_states() and celery_status == JobState.PENDING: | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Check 4: Stuck pending | |
| if job.status not in JobState.final_states(): | |
| status_changed = _check_stuck_pending(job, celery_status, now) | |
| _save_job(job, status_changed, save=save) | |
| return status_changed | |
| def check_job_status(job, force: bool = False, save: bool = True) -> bool: | |
| """ | |
| Check if the job's Celery task still exists and update status accordingly. | |
| This function handles multiple scenarios in order: | |
| 1. Resurrection - Job marked failed but task is actually running | |
| 2. Missing task_id - Job scheduled but never got a task ID | |
| 3. Stale job - Job running longer than allowed | |
| 4. Disappeared task - Task existed but is now gone from Celery | |
| 5. Stuck pending - Job waiting too long with/without workers | |
| Args: | |
| job: The Job instance to check | |
| force: Skip the recent check time limit and check final states | |
| save: Save the job if status changes | |
| Returns: | |
| bool: True if job status was changed, False otherwise | |
| """ | |
| from ami.jobs.models import JobState | |
| now = timezone.now() | |
| # Skip if checked recently (unless forced) | |
| if _should_skip_check(job, force, now): | |
| job.last_checked_at = now | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| job.last_checked_at = now | |
| # Check 0: Resurrection (failed jobs that came back to life) | |
| if not force and _check_if_resurrected(job, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| # Skip final states unless forced | |
| if not force and job.status in JobState.final_states(): | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Check 1: Missing Task ID | |
| if _check_missing_task_id(job, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| if not job.task_id: | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Get Celery task and status | |
| task, celery_status = _get_celery_task_status(job) | |
| # Check 2: Stale Job | |
| if _check_if_stale(job, task, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| # Check 3: Disappeared Task (only for non-final states) | |
| if job.status not in JobState.final_states() and _check_disappeared_task(job, celery_status, now): | |
| _save_job(job, status_changed=True, save=save) | |
| return True | |
| # No celery status available - can't check further | |
| if not celery_status: | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Skip PENDING status for final states (task just doesn't exist anymore) | |
| if job.status in JobState.final_states() and celery_status == JobState.PENDING: | |
| _save_job(job, status_changed=False, save=save) | |
| return False | |
| # Check 4: Stuck pending | |
| status_changed = False | |
| if job.status not in JobState.final_states(): | |
| status_changed = _check_stuck_pending(job, celery_status, now) | |
| _save_job(job, status_changed, save=save) | |
| return status_changed |
🤖 Prompt for AI Agents
In ami/jobs/status.py around lines 346 to 424, initialize the local variable
status_changed before it's used and call _check_status_mismatch() so mismatched
states are detected: add status_changed = False prior to the "if job.status not
in JobState.final_states()" block (so it's always defined), keep the existing
assignment when non-final and calling _check_stuck_pending, and after obtaining
celery_status (or after computing status_changed) call
_check_status_mismatch(job, celery_status, now) and if it returns True set
status_changed = True before the final _save_job call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
ami/jobs/tests.py (1)
219-310: Consider extracting repeated imports and timeout constants.Multiple test methods import
timedeltaandtimezoneat the method level, and use hardcoded timeout values (8 days, 10 minutes, etc.). These could be extracted to reduce duplication.Apply this pattern at the class level:
class TestJobStatusChecking(TestCase): """ Test the job status checking functionality. """ + + # Test timeout constants + MAX_RUNTIME_DAYS = 8 + OLD_SCHEDULED_MINUTES = 10 + RECENT_SCHEDULED_MINUTES = 1 def setUp(self): + from datetime import timedelta + from django.utils import timezone + + self.timedelta = timedelta + self.timezone = timezone + self.project = Project.objects.create(name="Status Check Test Project")Then use
self.timezone.now() - self.timedelta(days=self.MAX_RUNTIME_DAYS)in tests.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ami/jobs/tests.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/tests.py (3)
ami/jobs/models.py (7)
Job(719-1037)JobState(27-63)check_status(968-983)save(955-966)enqueue(812-829)retry(869-881)cancel(883-896)ami/jobs/tasks.py (1)
check_incomplete_jobs(74-158)ami/jobs/status.py (2)
check_celery_workers_available(21-39)check_celery_workers_available_cached(43-55)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
🔇 Additional comments (4)
ami/jobs/tests.py (4)
460-461: Good practice: Proper cache cleanup in setUp/tearDown.The test properly manages the cache lock in both setUp and tearDown, ensuring test isolation and preventing lock-related flakiness.
Also applies to: 473-474
476-513: Comprehensive test coverage for periodic task.The test effectively validates multiple scenarios:
- Job state transitions (STARTED → FAILURE, PENDING → FAILURE)
- Filtering of completed jobs (SUCCESS state)
- Result reporting with accurate counts
The test structure clearly demonstrates the expected behavior of the periodic task.
379-449: Thorough validation of status synchronization.These tests comprehensively verify that status remains synchronized between
job.statusandjob.progress.summary.statusacross key lifecycle operations (enqueue, retry, cancel, status checker). This addresses a critical consistency requirement for the job monitoring system.
560-561: No action required—decorator is present.The verification confirms that
check_celery_workers_available_cachedinami/jobs/status.pyis properly decorated with@functools.lru_cache(maxsize=1). Thecache_clear()call on line 561 is valid and will function correctly.
| self.assertEqual(job.status, JobState.FAILURE) | ||
| self.assertIsNotNone(job.last_checked_at) | ||
| # Check that error was logged | ||
| self.assertTrue(any("never got a task_id" in msg for msg in job.logs.stderr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log message validation may be brittle.
Tests validate error messages by searching for specific strings in job logs. If the exact wording changes in the implementation, these tests will break.
Consider one of these approaches:
- Define expected log patterns as constants in the status checking module and import them in tests
- Use more flexible pattern matching (e.g., regex)
- Check for structured log attributes instead of string matching if available
Example:
# In ami/jobs/status.py
MISSING_TASK_ID_ERROR = "never got a task_id"
DISAPPEARED_TASK_ERROR = "disappeared from Celery"
# In tests
from ami.jobs.status import MISSING_TASK_ID_ERROR
self.assertTrue(any(MISSING_TASK_ID_ERROR in msg for msg in job.logs.stderr))Also applies to: 282-282, 306-310
🤖 Prompt for AI Agents
In ami/jobs/tests.py around lines 240, 282 and 306-310, the tests assert on
literal substrings from job.logs.stderr which is brittle; instead add canonical
log message constants (e.g., MISSING_TASK_ID_ERROR, DISAPPEARED_TASK_ERROR) into
ami/jobs/status.py and import those constants into the tests, or replace the
literal checks with configurable regex patterns imported from a central module;
update each assertion to reference the imported constant or regex and use a
single helper (e.g., any(constant in msg for msg in job.logs.stderr) or
regex.search) so tests remain stable when wording changes.
| self.assertFalse(status_changed) | ||
|
|
||
| # With force, should check | ||
| job.started_at = timezone.now() - timedelta(days=8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Direct attribute modification might bypass model logic.
Line 354 directly modifies job.started_at without calling save() before the next check_status() call. This works in this test but could be misleading if model save triggers any side effects.
Consider explicitly saving after modification for clarity:
# With force, should check
job.started_at = timezone.now() - timedelta(days=8)
+ job.save()
status_changed = job.check_status(force=True, save=True)🤖 Prompt for AI Agents
In ami/jobs/tests.py around line 354, the test directly assigns job.started_at =
timezone.now() - timedelta(days=8) which mutates the model instance without
persisting potential model-side logic; update the test to persist the change by
calling job.save() (or job.save(update_fields=['started_at'])) immediately after
setting started_at so subsequent check_status() operates on a saved state, or
alternatively set the field via the ORM (e.g.,
Model.objects.filter(pk=job.pk).update(...)) and then refresh the instance with
job.refresh_from_db() before calling check_status().
| self.assertGreaterEqual(int(result["checked"]), 2) # Should check jobs 1 and 3 | ||
| self.assertGreaterEqual(int(result["updated"]), 2) # Should update both to FAILURE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type conversion in assertions suggests return type inconsistency.
The assertions use int(result["checked"]) and int(result["updated"]), suggesting these values might not be integers. However, looking at the check_incomplete_jobs function signature (line 73 in tasks.py), it returns dict[str, int | str], which should guarantee integer values for these keys.
Verify the return type consistency and remove unnecessary conversions:
- self.assertGreaterEqual(int(result["checked"]), 2) # Should check jobs 1 and 3
- self.assertGreaterEqual(int(result["updated"]), 2) # Should update both to FAILURE
+ self.assertGreaterEqual(result["checked"], 2) # Should check jobs 1 and 3
+ self.assertGreaterEqual(result["updated"], 2) # Should update both to FAILUREIf the conversion is necessary due to actual runtime behavior, this indicates a type mismatch between the signature and implementation in check_incomplete_jobs that should be fixed.
🤖 Prompt for AI Agents
In ami/jobs/tests.py around lines 512 to 513, the test is coercing
result["checked"] and result["updated"] to int(), which masks a mismatch between
the function signature and runtime values; inspect ami/jobs/tasks.py
(check_incomplete_jobs implementation) and ensure it returns integers for
"checked" and "updated" (update any places building the result to use int values
and adjust the function's return typing if necessary), then remove the
unnecessary int(...) conversions from the test assertions so they assert
directly on integer values and update/add a unit test or type check to prevent
regressions.

Simplified version of what's in #981 & #910
Primarily addressing jobs where the celery background job is disconnected the job is left running without progress. But also fixes other issues along the way.
Summary by CodeRabbit
New Features
Bug Fixes