Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ repos:
rev: 6.0.0
hooks:
- id: flake8
# Ensure flake8 reads the project's setup.cfg (pre-commit runs hooks
# from the hook repo directory which may contain its own .flake8).
# flake8 doesn't yet support pyproject.toml for config, so point to
# setup.cfg which contains our project flake8 settings.
args: ["--config=setup.cfg"]

- repo: https://github.com/Riverside-Healthcare/djLint
rev: v1.31.1
Expand Down
44 changes: 29 additions & 15 deletions ami/jobs/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ami.main.admin import AdminBase

from .models import Job, get_job_type_by_inferred_key
from .models import Job


@admin.register(Job)
Expand All @@ -13,14 +13,14 @@ class JobAdmin(AdminBase):

list_display = (
"name",
"job_type_key",
"status",
"task_id",
"project",
"scheduled_at",
"started_at",
"finished_at",
"duration",
"job_type_key",
"inferred_job_type",
)

@admin.action()
Expand All @@ -29,22 +29,28 @@ def enqueue_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None:
job.enqueue()
self.message_user(request, f"Queued {queryset.count()} job(s).")

@admin.display(description="Inferred Job Type")
def inferred_job_type(self, obj: Job) -> str:
"""
@TODO Remove this after running migration 0011_job_job_type_key.py and troubleshooting.
"""
job_type = get_job_type_by_inferred_key(obj)
return job_type.name if job_type else "Could not infer"
@admin.action()
def retry_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None:
for job in queryset:
job.retry(async_task=True)
self.message_user(request, f"Retried {queryset.count()} job(s).")

# return obj.job_type().name
@admin.action()
def cancel_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None:
for job in queryset:
job.cancel()
self.message_user(request, f"Cancelled {queryset.count()} job(s).")

actions = [enqueue_jobs]
actions = [enqueue_jobs, retry_jobs]
Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel_jobs action is defined at line 39 but not included in the actions list. Either remove the unused action method or add it to the actions list: actions = [enqueue_jobs, retry_jobs, cancel_jobs]

Suggested change
actions = [enqueue_jobs, retry_jobs]
actions = [enqueue_jobs, retry_jobs, cancel_jobs]

Copilot uses AI. Check for mistakes.

Comment on lines +44 to 45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add cancel_jobs to the actions list

cancel_jobs is defined but never registered in actions, so the new admin action will not appear in the UI. Please include it to make the cancellation action usable.

-    actions = [enqueue_jobs, retry_jobs]
+    actions = [enqueue_jobs, retry_jobs, cancel_jobs]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
actions = [enqueue_jobs, retry_jobs]
actions = [enqueue_jobs, retry_jobs, cancel_jobs]
🧰 Tools
🪛 Ruff (0.14.2)

44-44: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

🤖 Prompt for AI Agents
In ami/jobs/admin.py around lines 44-45, the admin action list currently
contains only [enqueue_jobs, retry_jobs]; add cancel_jobs to that list so the
cancel_jobs action is registered and appears in the admin UI (e.g., change
actions = [enqueue_jobs, retry_jobs] to actions = [enqueue_jobs, retry_jobs,
cancel_jobs]). Ensure no trailing commas or syntax errors are introduced.

exclude = (
# This takes too long to load in the admin panel
autocomplete_fields = (
"source_image_collection",
"source_image_single",
# These are read-only fields
"pipeline",
"project",
)

readonly_fields = (
"task_id",
"scheduled_at",
"started_at",
Expand All @@ -54,3 +60,11 @@ def inferred_job_type(self, obj: Job) -> str:
"progress",
"result",
)

list_filter = (
"status",
"job_type_key",
"project",
"source_image_collection",
"pipeline",
)
19 changes: 19 additions & 0 deletions ami/jobs/migrations/0019_job_last_checked_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 4.2.10 on 2025-10-30 19:54

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("jobs", "0018_alter_job_job_type_key"),
]

operations = [
migrations.AddField(
model_name="job",
name="last_checked_at",
field=models.DateTimeField(
blank=True, help_text="Last time job status was checked by periodic task", null=True
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Generated manually for periodic job status checking

from django.db import migrations


def create_periodic_task(apps, schema_editor):
"""Create periodic task to check unfinished jobs every 3 minutes."""
try:
from django_celery_beat.models import IntervalSchedule, PeriodicTask

interval_schedule, _ = IntervalSchedule.objects.get_or_create(
every=3,
period=IntervalSchedule.MINUTES,
)

PeriodicTask.objects.get_or_create(
name="jobs.check_incomplete_jobs",
defaults={
"task": "ami.jobs.tasks.check_incomplete_jobs",
"interval": interval_schedule,
"enabled": True,
"description": "Check status of unfinished jobs and update if tasks disappeared",
},
)
except Exception as e:
print(f"Warning: Could not create periodic task: {e}")
print("You may need to create it manually in the Django admin or via shell.")


def delete_periodic_task(apps, schema_editor):
"""Delete the periodic task if rolling back."""
try:
from django_celery_beat.models import PeriodicTask

PeriodicTask.objects.filter(name="jobs.check_incomplete_jobs").delete()
except Exception as e:
print(f"Warning: Could not delete periodic task: {e}")


class Migration(migrations.Migration):
dependencies = [
("jobs", "0019_job_last_checked_at"),
]

operations = [
migrations.RunPython(create_periodic_task, delete_periodic_task),
]
41 changes: 37 additions & 4 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,11 +722,22 @@ class Job(BaseModel):
# Hide old failed jobs after 3 days
FAILED_CUTOFF_HOURS = 24 * 3

# Job status checking configuration constants
NO_TASK_ID_TIMEOUT_SECONDS = 300 # 5 minutes
DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes
MAX_JOB_RUNTIME_SECONDS = 7 * 24 * 60 * 60 # 7 days
STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes

Copy link

Copilot AI Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing required constants STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS and PENDING_LOG_INTERVAL_SECONDS which are referenced in ami/jobs/status.py at lines 281, 284, 286, and 287. These constants should be defined in the Job model alongside other timeout constants.

Suggested change
STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 600 # 10 minutes
PENDING_LOG_INTERVAL_SECONDS = 60 # 1 minute

Copilot uses AI. Check for mistakes.
name = models.CharField(max_length=255)
queue = models.CharField(max_length=255, default="default")
scheduled_at = models.DateTimeField(null=True, blank=True)
started_at = models.DateTimeField(null=True, blank=True)
finished_at = models.DateTimeField(null=True, blank=True)
last_checked_at = models.DateTimeField(
null=True,
blank=True,
help_text="Last time job status was checked by periodic task",
)
# @TODO can we use an Enum or Pydantic model for status?
status = models.CharField(max_length=255, default=JobState.CREATED.name, choices=JobState.choices())
progress: JobProgress = SchemaField(JobProgress, default=default_job_progress)
Expand Down Expand Up @@ -813,7 +824,7 @@ def send_task():
self.started_at = None
self.finished_at = None
self.scheduled_at = datetime.datetime.now()
self.status = AsyncResult(task_id).status
self.update_status(AsyncResult(task_id).status, save=False)
self.update_progress(save=False)
self.save()

Expand Down Expand Up @@ -862,7 +873,7 @@ def retry(self, async_task=True):
self.logger.info(f"Re-running job {self}")
self.finished_at = None
self.progress.reset()
self.status = JobState.RETRY
self.update_status(JobState.RETRY, save=False)
self.save()
if async_task:
self.enqueue()
Expand All @@ -873,15 +884,15 @@ def cancel(self):
"""
Terminate the celery task.
"""
self.status = JobState.CANCELING
self.update_status(JobState.CANCELING, save=False)
self.save()
if self.task_id:
task = run_job.AsyncResult(self.task_id)
if task:
task.revoke(terminate=True)
self.save()
else:
self.status = JobState.REVOKED
self.update_status(JobState.REVOKED, save=False)
self.save()

def update_status(self, status=None, save=True):
Expand Down Expand Up @@ -909,6 +920,7 @@ def update_status(self, status=None, save=True):
def update_progress(self, save=True):
"""
Update the total aggregate progress from the progress of each stage.
Also ensure the displayed progress.summary.status is in sync with job.status.
"""
if not len(self.progress.stages):
# Need at least one stage to calculate progress
Expand All @@ -927,6 +939,10 @@ def update_progress(self, save=True):
total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages)

self.progress.summary.progress = total_progress
try:
self.progress.summary.status = JobState(self.status)
except ValueError:
self.progress.summary.status = JobState.UNKNOWN

if save:
self.save(update_progress=False)
Expand All @@ -949,6 +965,23 @@ def save(self, update_progress=True, *args, **kwargs):
if self.progress.summary.status != self.status:
logger.warning(f"Job {self} status mismatches progress: {self.progress.summary.status} != {self.status}")

def check_status(self, force: bool = False, save: bool = True) -> bool:
"""
Check if the job's Celery task still exists and update status accordingly.

This delegates to the status_checker module to avoid bloating this file.

Args:
force: Skip the recent check time limit
save: Save the job if status changes

Returns:
bool: True if job status was changed, False otherwise
"""
from ami.jobs.status import check_job_status

return check_job_status(self, force=force, save=save)

def check_custom_permission(self, user, action: str) -> bool:
job_type = self.job_type_key.lower()
if self.source_image_single:
Expand Down
Loading