-
Notifications
You must be signed in to change notification settings - Fork 11
[integration] Processing service V2 #987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 27 commits
ae02d2e
24a15af
0da97a6
2db7d66
8a714cd
700f594
3b42e08
8ea5d7d
61fc2c5
9af597c
7ff8865
0fbe899
7899fc5
d9f8ffd
edad552
d254867
1cc890e
84ee5a2
09fee92
4480b0d
3032709
3e7ef3b
04be994
a8b94e3
1fc20b5
0a5c89e
344f883
df7eaa3
0391642
4ae27b0
4f50b3d
a8fc79a
4efdf07
f221a1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -322,15 +322,13 @@ def run(cls, job: "Job"): | |
| """ | ||
| Procedure for an ML pipeline as a job. | ||
| """ | ||
| from ami.ml.orchestration.jobs import queue_images_to_nats | ||
|
|
||
| job.update_status(JobState.STARTED) | ||
| job.started_at = datetime.datetime.now() | ||
| job.finished_at = None | ||
| job.save() | ||
|
|
||
| # Keep track of sub-tasks for saving results, pair with batch number | ||
| save_tasks: list[tuple[int, AsyncResult]] = [] | ||
| save_tasks_completed: list[tuple[int, AsyncResult]] = [] | ||
|
|
||
| if job.delay: | ||
| update_interval_seconds = 2 | ||
| last_update = time.time() | ||
|
|
@@ -365,7 +363,7 @@ def run(cls, job: "Job"): | |
| progress=0, | ||
| ) | ||
|
|
||
| images = list( | ||
| images: list[SourceImage] = list( | ||
| # @TODO return generator plus image count | ||
| # @TODO pass to celery group chain? | ||
| job.pipeline.collect_images( | ||
|
|
@@ -389,8 +387,6 @@ def run(cls, job: "Job"): | |
| images = images[: job.limit] | ||
| image_count = len(images) | ||
| job.progress.add_stage_param("collect", "Limit", image_count) | ||
| else: | ||
| image_count = source_image_count | ||
|
|
||
| job.progress.update_stage( | ||
| "collect", | ||
|
|
@@ -401,6 +397,24 @@ def run(cls, job: "Job"): | |
| # End image collection stage | ||
| job.save() | ||
|
|
||
| if job.project.feature_flags.async_pipeline_workers: | ||
| queued = queue_images_to_nats(job, images) | ||
| if not queued: | ||
| job.logger.error("Aborting job %s because images could not be queued to NATS", job.pk) | ||
| job.progress.update_stage("collect", status=JobState.FAILURE) | ||
| job.update_status(JobState.FAILURE) | ||
| job.finished_at = datetime.datetime.now() | ||
| job.save() | ||
| return | ||
| else: | ||
| cls.process_images(job, images) | ||
|
Comment on lines
+400
to
+410
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Async ML path leaves overall job lifecycle undefined (status/finished_at, Celery status mismatch) When
Net effect: jobs can show Consider tightening this by:
This will make the async path match the synchronous 🤖 Prompt for AI Agents |
||
|
|
||
| @classmethod | ||
| def process_images(cls, job, images): | ||
| image_count = len(images) | ||
| # Keep track of sub-tasks for saving results, pair with batch number | ||
| save_tasks: list[tuple[int, AsyncResult]] = [] | ||
| save_tasks_completed: list[tuple[int, AsyncResult]] = [] | ||
| total_captures = 0 | ||
| total_detections = 0 | ||
| total_classifications = 0 | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.