-
Notifications
You must be signed in to change notification settings - Fork 11
Add periodic status check for incomplete jobs #1025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
805469d
413eda0
824307e
b3dd017
296a765
3a090fd
100bb05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |||||
|
|
||||||
| from ami.main.admin import AdminBase | ||||||
|
|
||||||
| from .models import Job, get_job_type_by_inferred_key | ||||||
| from .models import Job | ||||||
|
|
||||||
|
|
||||||
| @admin.register(Job) | ||||||
|
|
@@ -13,14 +13,14 @@ class JobAdmin(AdminBase): | |||||
|
|
||||||
| list_display = ( | ||||||
| "name", | ||||||
| "job_type_key", | ||||||
| "status", | ||||||
| "task_id", | ||||||
| "project", | ||||||
| "scheduled_at", | ||||||
| "started_at", | ||||||
| "finished_at", | ||||||
| "duration", | ||||||
| "job_type_key", | ||||||
| "inferred_job_type", | ||||||
| ) | ||||||
|
|
||||||
| @admin.action() | ||||||
|
|
@@ -29,22 +29,28 @@ def enqueue_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None: | |||||
| job.enqueue() | ||||||
| self.message_user(request, f"Queued {queryset.count()} job(s).") | ||||||
|
|
||||||
| @admin.display(description="Inferred Job Type") | ||||||
| def inferred_job_type(self, obj: Job) -> str: | ||||||
| """ | ||||||
| @TODO Remove this after running migration 0011_job_job_type_key.py and troubleshooting. | ||||||
| """ | ||||||
| job_type = get_job_type_by_inferred_key(obj) | ||||||
| return job_type.name if job_type else "Could not infer" | ||||||
| @admin.action() | ||||||
| def retry_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None: | ||||||
| for job in queryset: | ||||||
| job.retry(async_task=True) | ||||||
| self.message_user(request, f"Retried {queryset.count()} job(s).") | ||||||
|
|
||||||
| # return obj.job_type().name | ||||||
| @admin.action() | ||||||
| def cancel_jobs(self, request: HttpRequest, queryset: QuerySet[Job]) -> None: | ||||||
| for job in queryset: | ||||||
| job.cancel() | ||||||
| self.message_user(request, f"Cancelled {queryset.count()} job(s).") | ||||||
|
|
||||||
| actions = [enqueue_jobs] | ||||||
| actions = [enqueue_jobs, retry_jobs] | ||||||
|
|
||||||
|
Comment on lines
+44
to
45
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add
- actions = [enqueue_jobs, retry_jobs]
+ actions = [enqueue_jobs, retry_jobs, cancel_jobs]📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.14.2)44-44: Mutable class attributes should be annotated with (RUF012) 🤖 Prompt for AI Agents |
||||||
| exclude = ( | ||||||
| # This takes too long to load in the admin panel | ||||||
| autocomplete_fields = ( | ||||||
| "source_image_collection", | ||||||
| "source_image_single", | ||||||
| # These are read-only fields | ||||||
| "pipeline", | ||||||
| "project", | ||||||
| ) | ||||||
|
|
||||||
| readonly_fields = ( | ||||||
| "task_id", | ||||||
| "scheduled_at", | ||||||
| "started_at", | ||||||
|
|
@@ -54,3 +60,11 @@ def inferred_job_type(self, obj: Job) -> str: | |||||
| "progress", | ||||||
| "result", | ||||||
| ) | ||||||
|
|
||||||
| list_filter = ( | ||||||
| "status", | ||||||
| "job_type_key", | ||||||
| "project", | ||||||
| "source_image_collection", | ||||||
| "pipeline", | ||||||
| ) | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| # Generated by Django 4.2.10 on 2025-10-30 19:54 | ||
|
|
||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| ("jobs", "0018_alter_job_job_type_key"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name="job", | ||
| name="last_checked_at", | ||
| field=models.DateTimeField( | ||
| blank=True, help_text="Last time job status was checked by periodic task", null=True | ||
| ), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| # Generated manually for periodic job status checking | ||
|
|
||
| from django.db import migrations | ||
|
|
||
|
|
||
| def create_periodic_task(apps, schema_editor): | ||
| """Create periodic task to check unfinished jobs every 3 minutes.""" | ||
| try: | ||
| from django_celery_beat.models import IntervalSchedule, PeriodicTask | ||
|
|
||
| interval_schedule, _ = IntervalSchedule.objects.get_or_create( | ||
| every=3, | ||
| period=IntervalSchedule.MINUTES, | ||
| ) | ||
|
|
||
| PeriodicTask.objects.get_or_create( | ||
| name="jobs.check_incomplete_jobs", | ||
| defaults={ | ||
| "task": "ami.jobs.tasks.check_incomplete_jobs", | ||
| "interval": interval_schedule, | ||
| "enabled": True, | ||
| "description": "Check status of unfinished jobs and update if tasks disappeared", | ||
| }, | ||
| ) | ||
| except Exception as e: | ||
| print(f"Warning: Could not create periodic task: {e}") | ||
| print("You may need to create it manually in the Django admin or via shell.") | ||
|
|
||
|
|
||
| def delete_periodic_task(apps, schema_editor): | ||
| """Delete the periodic task if rolling back.""" | ||
| try: | ||
| from django_celery_beat.models import PeriodicTask | ||
|
|
||
| PeriodicTask.objects.filter(name="jobs.check_incomplete_jobs").delete() | ||
| except Exception as e: | ||
| print(f"Warning: Could not delete periodic task: {e}") | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| ("jobs", "0019_job_last_checked_at"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.RunPython(create_periodic_task, delete_periodic_task), | ||
| ] |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -722,11 +722,22 @@ class Job(BaseModel): | |||||||
| # Hide old failed jobs after 3 days | ||||||||
| FAILED_CUTOFF_HOURS = 24 * 3 | ||||||||
|
|
||||||||
| # Job status checking configuration constants | ||||||||
| NO_TASK_ID_TIMEOUT_SECONDS = 300 # 5 minutes | ||||||||
| DISAPPEARED_TASK_RETRY_THRESHOLD_SECONDS = 300 # 5 minutes | ||||||||
| MAX_JOB_RUNTIME_SECONDS = 7 * 24 * 60 * 60 # 7 days | ||||||||
| STUCK_PENDING_TIMEOUT_SECONDS = 600 # 10 minutes | ||||||||
|
|
||||||||
|
||||||||
| STUCK_PENDING_NO_WORKERS_TIMEOUT_SECONDS = 600 # 10 minutes | |
| PENDING_LOG_INTERVAL_SECONDS = 60 # 1 minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
cancel_jobsaction is defined at line 39 but not included in the actions list. Either remove the unused action method or add it to the actions list:actions = [enqueue_jobs, retry_jobs, cancel_jobs]