-
Notifications
You must be signed in to change notification settings - Fork 11
[integration] Processing service V2 #987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for antenna-preview canceled.
|
|
Exciting! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.
Key changes:
- Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
- Introduced new REST API endpoints for task pulling (
/jobs/{id}/tasks) and result submission (/jobs/{id}/result) - Implemented Redis-based progress tracking to handle asynchronous worker updates
Reviewed Changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| requirements/base.txt | Added nats-py dependency for NATS client support |
| object_model_diagram.md | Added comprehensive Mermaid diagram documenting ML pipeline system architecture |
| docker-compose.yml | Added NATS JetStream service with health checks and monitoring |
| config/settings/base.py | Added NATS_URL configuration setting |
| ami/utils/nats_queue.py | New TaskQueueManager class for NATS JetStream operations |
| ami/jobs/views.py | Added task pulling and result submission endpoints with pipeline filtering |
| ami/jobs/utils.py | Helper function for running async code in sync Django context |
| ami/jobs/tasks.py | New Celery task for processing pipeline results asynchronously |
| ami/jobs/task_state.py | TaskStateManager for Redis-based job progress tracking |
| ami/jobs/models.py | Added queue_images_to_nats method and NATS cleanup logic |
| ami/base/views.py | Fixed request.data handling when not a dict |
| README.md | Added NATS dashboard documentation link |
| .vscode/launch.json | Added debug configurations for Django and Celery containers |
| .envs/.local/.django | Added NATS_URL environment variable |
| .dockerignore | Expanded with comprehensive ignore patterns |
Comments suppressed due to low confidence (1)
object_model_diagram.md:1
- The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds NATS JetStream queuing and management, Redis-backed per-job task state, a Celery result-processing task, an async ML job execution path behind a feature flag, API endpoints to reserve/submit tasks, Docker/env/docs entries for NATS, and orchestration helpers. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant MLJob
participant Flags as FeatureFlags
participant QueueMgr as TaskQueueManager
participant State as TaskStateManager
participant Worker
participant Celery as process_pipeline_result
participant DB as Database
Client->>MLJob: run(job, images)
MLJob->>Flags: check async_pipeline_workers
alt async enabled
MLJob->>State: initialize_job(image_ids)
MLJob->>QueueMgr: queue_images_to_nats(job, images)
loop per image
QueueMgr->>QueueMgr: publish_task(job_id, message)
end
QueueMgr-->>MLJob: return success/failure
else sync path
MLJob->>MLJob: process_images(job, images)
MLJob->>DB: persist results & progress
end
Note over Worker,QueueMgr: Workers later reserve and process queued tasks
Worker->>QueueMgr: reserve_task(job_id, batch)
Worker->>Celery: invoke pipeline, produce result + reply_subject
Celery->>State: update_state(processed_ids, "process", request_id)
Celery->>DB: save pipeline results
Celery->>QueueMgr: acknowledge_task(reply_subject)
Celery->>State: update_state(processed_ids, "results", request_id)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45–75 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
.dockerignore(1 hunks).envs/.local/.django(1 hunks).gitignore(1 hunks).vscode/launch.json(1 hunks)README.md(1 hunks)ami/base/views.py(1 hunks)ami/jobs/models.py(8 hunks)ami/jobs/tasks.py(2 hunks)ami/jobs/views.py(3 hunks)ami/main/models.py(1 hunks)ami/ml/orchestration/jobs.py(1 hunks)ami/ml/orchestration/nats_queue.py(1 hunks)ami/ml/orchestration/task_state.py(1 hunks)ami/ml/orchestration/utils.py(1 hunks)ami/utils/requests.py(2 hunks)config/settings/base.py(2 hunks)docker-compose.yml(4 hunks)object_model_diagram.md(1 hunks)requirements/base.txt(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
result(256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
cleanup(20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
process_pipeline_result(45-138)ami/jobs/models.py (4)
Job(727-1012)JobState(27-63)logger(997-1006)final_states(58-59)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)reserve_task(152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)acknowledge_task(210-229)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-97)mark_images_processed(48-61)get_progress(63-90)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)ami/jobs/models.py (5)
Job(727-1012)JobState(27-63)logger(997-1006)update_stage(168-188)save(947-958)ami/ml/models/pipeline.py (3)
save(1115-1121)save_results(809-917)save_results(1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
Job(727-1012)logger(997-1006)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(28-294)cleanup_job_resources(278-294)publish_task(119-150)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-97)cleanup(92-97)initialize_job(38-46)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
logger(997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
get(1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/main/models.py (1)
SourceImage(1622-1870)ami/ml/models/pipeline.py (2)
process_images(163-278)process_images(1091-1105)
🪛 LanguageTool
object_model_diagram.md
[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md
15-15: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
31-31: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
38-38: Bare URL used
(MD034, no-bare-urls)
39-39: Bare URL used
(MD034, no-bare-urls)
40-40: Bare URL used
(MD034, no-bare-urls)
41-41: Bare URL used
(MD034, no-bare-urls)
42-42: Bare URL used
(MD034, no-bare-urls)
42-42: Bare URL used
(MD034, no-bare-urls)
43-43: Bare URL used
(MD034, no-bare-urls)
61-61: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
77-77: Bare URL used
(MD034, no-bare-urls)
97-97: Bare URL used
(MD034, no-bare-urls)
118-118: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
122-122: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
126-126: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
130-130: Code block style
Expected: fenced; Actual: indented
(MD046, code-block-style)
🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py
70-70: Unused method argument: ttr
(ARG002)
73-73: Avoid specifying long messages outside the exception class
(TRY003)
81-81: Do not catch blind exception: Exception
(BLE001)
94-94: Avoid specifying long messages outside the exception class
(TRY003)
103-103: Do not catch blind exception: Exception
(BLE001)
132-132: Avoid specifying long messages outside the exception class
(TRY003)
146-146: Consider moving this statement to an else block
(TRY300)
148-148: Do not catch blind exception: Exception
(BLE001)
149-149: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
164-164: Avoid specifying long messages outside the exception class
(TRY003)
206-206: Do not catch blind exception: Exception
(BLE001)
207-207: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
221-221: Avoid specifying long messages outside the exception class
(TRY003)
226-226: Consider moving this statement to an else block
(TRY300)
227-227: Do not catch blind exception: Exception
(BLE001)
228-228: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
242-242: Avoid specifying long messages outside the exception class
(TRY003)
250-250: Consider moving this statement to an else block
(TRY300)
251-251: Do not catch blind exception: Exception
(BLE001)
252-252: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
266-266: Avoid specifying long messages outside the exception class
(TRY003)
273-273: Consider moving this statement to an else block
(TRY300)
274-274: Do not catch blind exception: Exception
(BLE001)
275-275: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/task_state.py
35-35: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
36-36: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
ami/jobs/views.py
33-43: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
73-73: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
218-218: Unused method argument: pk
(ARG002)
236-236: Avoid specifying long messages outside the exception class
(TRY003)
244-244: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
283-283: Avoid specifying long messages outside the exception class
(TRY003)
288-288: Avoid specifying long messages outside the exception class
(TRY003)
298-298: Avoid specifying long messages outside the exception class
(TRY003)
301-301: Avoid specifying long messages outside the exception class
(TRY003)
322-322: Do not catch blind exception: Exception
(BLE001)
323-323: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/tasks.py
45-45: Unused function argument: self
(ARG001)
120-120: Do not catch blind exception: Exception
(BLE001)
121-121: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
133-133: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
136-136: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
config/settings/base.py
28-28: Unused noqa directive (non-enabled: E231, E501)
Remove unused noqa directive
(RUF100)
ami/ml/orchestration/jobs.py
69-69: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
78-78: Do not catch blind exception: Exception
(BLE001)
ami/ml/orchestration/utils.py
14-14: Do not catch blind exception: Exception
(BLE001)
15-15: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/jobs/models.py
75-75: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
430-430: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
482-482: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)
80-143: Don’t mark progress until after durable save + ACKWe remove image IDs from Redis and advance the “process” stage before
save_resultsruns and well before we confirm the JetStream ACK. Ifsave_resultsthrows or the ACK fails, the task retries (or JetStream redelivers), yet Redis already shows the images done and the job stage isSUCCESS. At that point the worker cannot recover accurate progress, and the retry will double-write results. Move bothupdate_state(..., "process")and the corresponding_update_job_progresscall to after a successfulsave_resultsand ACK, and make ACK failure fatal so Celery retries before we mutate Redis. Perform the same gating for the “results” stage so we never remove IDs unless the message is fully settled. request_verification- state_manager = TaskStateManager(job_id) - - progress_info = state_manager.update_state(processed_image_ids, stage="process", request_id=self.request.id) - if not progress_info: - ... - try: - _update_job_progress(job_id, "process", progress_info.percentage) - ... - if pipeline_result: - job.pipeline.save_results(...) - ... - ack_success = run_in_async_loop(...) - if ack_success: - ... - # Update job stage with calculated progress - progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id) - if not progress_info: - ... - _update_job_progress(job_id, "results", progress_info.percentage) + state_manager = TaskStateManager(job_id) + + try: + job = Job.objects.get(pk=job_id) + ... + if pipeline_result: + job.pipeline.save_results(results=pipeline_result, job_id=job.pk) + ... + ack_success = run_in_async_loop(...) + if not ack_success: + raise RuntimeError(f"Failed to acknowledge task via NATS: {reply_subject}") + + progress_info = state_manager.update_state( + processed_image_ids, stage="process", request_id=self.request.id + ) + if not progress_info: + raise self.retry(countdown=5, max_retries=10) + _update_job_progress(job_id, "process", progress_info.percentage) + + progress_info = state_manager.update_state( + processed_image_ids, stage="results", request_id=self.request.id + ) + if not progress_info: + raise self.retry(countdown=5, max_retries=10) + _update_job_progress(job_id, "results", progress_info.percentage)
🧹 Nitpick comments (4)
ami/jobs/utils.py (1)
36-48: Uselogging.exceptionin exception handlers for better diagnostics.In exception handlers, prefer
logging.exceptionoverlogging.errorto automatically include the stack trace, which aids debugging.Based on learnings
Apply this diff:
try: image = SourceImage.objects.select_related("deployment__project").get(pk=image_id) except SourceImage.DoesNotExist: - logger.error(f"SourceImage with id {image_id} does not exist") + logger.exception(f"SourceImage with id {image_id} does not exist") raise # Fetch the pipeline and validate it exists try: pipeline = Pipeline.objects.get(pk=pipeline_id) except Pipeline.DoesNotExist: - logger.error(f"Pipeline with id {pipeline_id} does not exist") + logger.exception(f"Pipeline with id {pipeline_id} does not exist") raiseami/jobs/management/commands/process_single_image.py (3)
51-65: Consider removing duplicate validation logic.The validation of
SourceImageandPipelinehere duplicates the validation already performed insidesubmit_single_image_job(lines 36-48 inami/jobs/utils.py). While this provides earlier feedback to the user, it creates maintenance overhead if validation logic changes.If you prefer immediate user feedback, keep this validation. Otherwise, rely on the exceptions raised by
submit_single_image_joband handle them in the submission try-except block (lines 71-78).
71-78: Replace blindExceptioncatch with specific exceptions.Catching
Exceptionis too broad and may mask unexpected errors. Catch only the specific exceptions thatsubmit_single_image_jobcan raise.Apply this diff:
try: job = submit_single_image_job( image_id=image_id, pipeline_id=pipeline_id, job_name=job_name, ) - except Exception as e: - raise CommandError(f"Failed to submit job: {str(e)}") + except (SourceImage.DoesNotExist, Pipeline.DoesNotExist, Project.DoesNotExist) as e: + raise CommandError(f"Failed to submit job: {str(e)}") from eNote: If you keep the validation at lines 51-65,
SourceImage.DoesNotExistandPipeline.DoesNotExistwon't be raised here, so you can simplify to justProject.DoesNotExist.
161-161: Remove unusednoqadirective.The
noqa: E221directive is unnecessary as E221 is not an enabled rule in your Ruff configuration.Apply this diff:
for error in job.progress.errors[-5:]: # Last 5 errors - self.stdout.write(f" - {error}") # noqa: E221 + self.stdout.write(f" - {error}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
ami/jobs/management/commands/README.md(1 hunks)ami/jobs/management/commands/process_single_image.py(1 hunks)ami/jobs/tasks.py(3 hunks)ami/jobs/utils.py(1 hunks)ami/ml/orchestration/__init__.py(1 hunks)ami/ml/orchestration/jobs.py(1 hunks)ami/ml/orchestration/task_state.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ami/jobs/management/commands/process_single_image.py (2)
ami/jobs/utils.py (1)
submit_single_image_job(12-78)ami/main/models.py (1)
Classification(2208-2352)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
cleanup(20-23)
ami/jobs/utils.py (1)
ami/jobs/models.py (2)
Job(734-1019)enqueue(816-833)
ami/jobs/tasks.py (6)
ami/jobs/views.py (1)
result(256-339)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)acknowledge_task(210-229)ami/ml/orchestration/task_state.py (2)
TaskStateManager(17-126)update_state(55-84)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/models/pipeline.py (3)
save_results(809-917)save_results(1107-1108)save(1115-1121)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (5)
Job(734-1019)JobState(27-63)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/orchestration/nats_queue.py (2)
cleanup_job_resources(278-294)publish_task(119-150)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-126)cleanup(120-126)initialize_job(38-50)ami/ml/orchestration/utils.py (1)
run_in_async_loop(8-18)
🪛 Ruff (0.14.3)
ami/jobs/management/commands/process_single_image.py
44-44: Unused method argument: args
(ARG002)
58-58: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
58-58: Avoid specifying long messages outside the exception class
(TRY003)
65-65: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
65-65: Avoid specifying long messages outside the exception class
(TRY003)
77-77: Do not catch blind exception: Exception
(BLE001)
78-78: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
78-78: Avoid specifying long messages outside the exception class
(TRY003)
78-78: Use explicit conversion flag
Replace with conversion flag
(RUF010)
161-161: Unused noqa directive (non-enabled: E221)
Remove unused noqa directive
(RUF100)
ami/ml/orchestration/task_state.py
35-35: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
36-36: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
53-53: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
ami/jobs/utils.py
40-40: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
47-47: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/__init__.py
5-5: Unused noqa directive (unused: F403; non-enabled: F401)
Remove unused noqa directive
(RUF100)
ami/jobs/tasks.py
128-128: Do not catch blind exception: Exception
(BLE001)
129-129: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
140-140: Abstract raise to an inner function
(TRY301)
144-144: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
147-147: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/jobs.py
69-69: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
78-78: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (4)
ami/jobs/utils.py (1)
56-78: Well-structured job creation and enqueuing flow.The implementation correctly generates a default job name, creates the Job with appropriate associations (project, pipeline, image), and enqueues it via Celery. Logging at key points aids observability.
ami/jobs/management/commands/process_single_image.py (3)
18-42: Well-designed CLI interface.The argument definitions are clear and provide good defaults. The
--waitflag with configurable--poll-intervaloffers flexible UX for both scripted and interactive use.
100-131: Excellent job polling and progress display implementation.The polling loop efficiently tracks status changes, provides live progress updates using
\rfor in-place updates, and gracefully handlesKeyboardInterrupt. This creates a polished user experience.
137-165: Comprehensive result reporting enhances debuggability.The result display logic provides actionable feedback for success, failure, and unexpected statuses, including detection/classification counts and error summaries. This is particularly valuable for testing and debugging workflows.
ami/jobs/utils.py
Outdated
| # Infer project from image if not provided | ||
| if project_id is None: | ||
| project = image.deployment.project | ||
| else: | ||
| project = Project.objects.get(pk=project_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential Project.DoesNotExist exception.
When project_id is explicitly provided, Project.objects.get(pk=project_id) can raise Project.DoesNotExist, but this exception is neither caught nor documented in the function's Raises section.
Wrap the fetch in a try-except block and document the exception:
# Infer project from image if not provided
if project_id is None:
project = image.deployment.project
else:
- project = Project.objects.get(pk=project_id)
+ try:
+ project = Project.objects.get(pk=project_id)
+ except Project.DoesNotExist:
+ logger.exception(f"Project with id {project_id} does not exist")
+ raiseAnd update the docstring:
Raises:
SourceImage.DoesNotExist: If the image doesn't exist
Pipeline.DoesNotExist: If the pipeline doesn't exist
+ Project.DoesNotExist: If the provided project_id doesn't exist🤖 Prompt for AI Agents
In ami/jobs/utils.py around lines 50 to 54, the code calls
Project.objects.get(pk=project_id) without handling Project.DoesNotExist; wrap
that call in a try/except that catches Project.DoesNotExist and re-raises it
with clearer context (e.g., include project_id and a brief message) using "raise
Project.DoesNotExist(... ) from e" so stacktrace is preserved, and update the
function docstring's Raises section to list Project.DoesNotExist with the new
behavior.
ami/ml/orchestration/task_state.py
Outdated
| def initialize_job(self, image_ids: list[str], stages: list[str]) -> None: | ||
| """ | ||
| Initialize job tracking with a list of image IDs to process. | ||
| Args: | ||
| image_ids: List of image IDs that need to be processed | ||
| stages: List of stages to track for each image | ||
| """ | ||
| self.stages = stages | ||
| for stage in stages: | ||
| cache.set(self._get_pending_key(stage), image_ids, timeout=self.TIMEOUT) | ||
|
|
||
| cache.set(self._total_key, len(image_ids), timeout=self.TIMEOUT) | ||
|
|
||
| def _get_pending_key(self, stage: str) -> str: | ||
| return f"{self._pending_key}:{stage}" # noqa E231 | ||
|
|
||
| def update_state( | ||
| self, | ||
| processed_image_ids: set[str], | ||
| stage: str, | ||
| request_id: str, | ||
| ) -> None | TaskProgress: | ||
| """ | ||
| Update the task state with newly processed images. | ||
| Args: | ||
| processed_image_ids: Set of image IDs that have just been processed | ||
| """ | ||
| # Create a unique lock key for this job | ||
| lock_key = f"job:{self.job_id}:process_results_lock" | ||
| lock_timeout = 360 # 6 minutes (matches task time_limit) | ||
| lock_acquired = cache.add(lock_key, request_id, timeout=lock_timeout) | ||
| if not lock_acquired: | ||
| return None | ||
|
|
||
| try: | ||
| # Update progress tracking in Redis | ||
| progress_info = self._get_progress(processed_image_ids, stage) | ||
| return progress_info | ||
| finally: | ||
| # Always release the lock when done | ||
| current_lock_value = cache.get(lock_key) | ||
| # Only delete if we still own the lock (prevents race condition) | ||
| if current_lock_value == request_id: | ||
| cache.delete(lock_key) | ||
| logger.debug(f"Released lock for job {self.job_id}, task {request_id}") | ||
|
|
||
| def _get_progress(self, processed_image_ids: set[str], stage: str) -> TaskProgress | None: | ||
| """ | ||
| Get current progress information for the job. | ||
| Returns: | ||
| TaskProgress namedtuple with fields: | ||
| - remaining: Number of images still pending (or None if not tracked) | ||
| - total: Total number of images (or None if not tracked) | ||
| - processed: Number of images processed (or None if not tracked) | ||
| - percentage: Progress as float 0.0-1.0 (or None if not tracked) | ||
| """ | ||
| pending_images = cache.get(self._get_pending_key(stage)) | ||
| total_images = cache.get(self._total_key) | ||
| if pending_images is None or total_images is None: | ||
| return None | ||
| remaining_images = [img_id for img_id in pending_images if img_id not in processed_image_ids] | ||
| assert len(pending_images) >= len(remaining_images) | ||
| cache.set(self._get_pending_key(stage), remaining_images, timeout=self.TIMEOUT) | ||
|
|
||
| remaining = len(remaining_images) | ||
| processed = total_images - remaining | ||
| percentage = float(processed) / total_images if total_images > 0 else 1.0 | ||
| logger.info( | ||
| f"Pending images from Redis for job {self.job_id} {stage}: " | ||
| f"{remaining}/{total_images}: {percentage*100}%" | ||
| ) | ||
|
|
||
| return TaskProgress( | ||
| remaining=remaining, | ||
| total=total_images, | ||
| processed=processed, | ||
| percentage=percentage, | ||
| ) | ||
|
|
||
| def cleanup(self) -> None: | ||
| """ | ||
| Delete all Redis keys associated with this job. | ||
| """ | ||
| for stage in self.stages: | ||
| cache.delete(self._get_pending_key(stage)) | ||
| cache.delete(self._total_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Persist the tracked stages so cleanup() works in new processes
cleanup() assumes self.stages is populated on this instance, but initialize_job() only sets it in-memory. As soon as another worker or process instantiates TaskStateManager(job_id) (the typical pattern for cleanup after a job finishes), self.stages is missing and cleanup() raises AttributeError, leaving Redis keys behind. Persist the stage list alongside the other job keys so any instance can retrieve it before cleaning up. request_verification
class TaskStateManager:
@@
- self._pending_key = f"job:{job_id}:pending_images" # noqa E231
- self._total_key = f"job:{job_id}:pending_images_total" # noqa E231
+ self._pending_key = f"job:{job_id}:pending_images"
+ self._total_key = f"job:{job_id}:pending_images_total"
+ self._stages_key = f"job:{job_id}:pending_images_stages"
+ self.stages: list[str] = cache.get(self._stages_key, [])
@@
- self.stages = stages
+ self.stages = stages
+ cache.set(self._stages_key, stages, timeout=self.TIMEOUT)
@@
def cleanup(self) -> None:
@@
- for stage in self.stages:
+ for stage in self.stages:
cache.delete(self._get_pending_key(stage))
cache.delete(self._total_key)
+ cache.delete(self._stages_key)Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.14.3)
53-53: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ami/jobs/models.py (1)
412-413: Consider adding type hints and a docstring.The new
process_imagesmethod would benefit from explicit type hints for its parameters and a docstring describing its purpose, especially since the codebase is adding type annotations (see line 366).Example:
@classmethod def process_images(cls, job: "Job", images: list[SourceImage]) -> None: """ Process images through the ML pipeline in batches. Args: job: The Job instance to update with progress images: List of SourceImage instances to process """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ami/jobs/models.py(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/main/models.py (12)
SourceImage(1622-1870)save(320-323)save(883-896)save(1118-1121)save(1461-1468)save(1560-1563)save(1828-1831)save(2109-2130)save(2346-2352)save(2525-2530)save(2813-2830)save(3450-3453)ami/ml/models/pipeline.py (3)
save(1115-1121)process_images(163-278)process_images(1091-1105)
🪛 Ruff (0.14.3)
ami/jobs/models.py
75-75: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
437-437: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
489-489: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
🔇 Additional comments (3)
ami/jobs/models.py (3)
325-326: Good use of local import to avoid circular dependencies.Importing
queue_images_to_natswithin the method scope is appropriate here, especially if it helps avoid circular import issues between the jobs and orchestration modules.
366-366: Good addition of type annotation.The explicit type annotation
list[SourceImage]improves code clarity and enables better IDE support.
400-410: LGTM: Async routing with proper failure handling.The feature-flag-driven routing logic correctly handles the failure case when
queue_images_to_natsreturnsFalseby marking the job asFAILURE, logging the error, and exiting early. This addresses the critical concern from previous reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
config/settings/base.py (1)
27-29: Remove the unusednoqadirective.The
# noqa: E231, E501comment on line 28 is suppressing lint rules that aren't triggered, causing a Ruff RUF100 violation. This issue was previously flagged and should be addressed.
🧹 Nitpick comments (2)
README.md (1)
43-43: Consider wrapping the URL in angle brackets.To satisfy the Markdown linter, wrap the bare URL in angle brackets:
<https://natsdashboard.com/>.Apply this diff:
-- NATS dashboard: https://natsdashboard.com/ (Add localhost) +- NATS dashboard: <https://natsdashboard.com/> (Add localhost)ami/utils/requests.py (1)
149-154: Consider using OpenApiTypes for consistency.For consistency with the newly added parameters, consider updating
project_id_doc_paramto useOpenApiTypes.INTinstead of the built-ininttype.project_id_doc_param = OpenApiParameter( name="project_id", description="Filter by project ID", required=False, - type=int, + type=OpenApiTypes.INT, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.gitignore(1 hunks)README.md(1 hunks)ami/utils/requests.py(2 hunks)config/settings/base.py(2 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
README.md
43-43: Bare URL used
(MD034, no-bare-urls)
🪛 Ruff (0.14.3)
config/settings/base.py
28-28: Unused noqa directive (non-enabled: E231, E501)
Remove unused noqa directive
(RUF100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
🔇 Additional comments (4)
.gitignore (1)
280-281: Clean .gitignore addition.The new entry appropriately excludes HuggingFace cache directories from version control, following established patterns in the file. The formatting and spacing are consistent with surrounding entries.
config/settings/base.py (1)
266-268: LGTM!The NATS_URL configuration is properly added with a sensible default and follows the existing pattern for environment variable configuration.
ami/utils/requests.py (2)
156-167: LGTM! Clear and well-structured parameter definitions.Both
ids_only_paramandincomplete_only_paramare correctly defined with appropriate types and clear descriptions for the OpenAPI schema.
168-173: Batch parameter validation is properly implemented.The endpoint already validates the batch parameter at
ami/jobs/views.py:230usingIntegerField(required=False, min_value=1).clean(), which ensures it is a positive integer with a default of 1. No further action is needed.
…result endpoints) - Add ids_only and incomplete_only query parameters to job list endpoint - Add JobFilterSet with pipeline__slug filtering and search fields - Add tasks() endpoint (stubbed) for NATS JetStream integration - Add result() endpoint (stubbed) for pipeline result processing - Add utility OpenApiParameter definitions for new query params The tasks() and result() endpoints are documented and stubbed out, awaiting NATS and Celery task dependencies from PR RolnickLab#987.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
config/settings/base.py (1)
27-29: Remove the unusednoqadirective.The
# noqa: E231, E501comment is suppressing non-enabled rules, causing a lint failure (RUF100). This was previously flagged but remains unaddressed.Apply this diff:
EXTERNAL_BASE_URL = env( - "EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}" # noqa: E231, E501 # type: ignore[no-untyped-call] + "EXTERNAL_BASE_URL", + default=f"http://{EXTERNAL_HOSTNAME}", ) +# type: ignore[no-untyped-call]
🧹 Nitpick comments (1)
README.md (1)
69-69: Use markdown link syntax and clarify the instruction.The bare URL violates markdown linting (MD034), and the "(Add localhost)" instruction could be clearer for users unfamiliar with the NATS dashboard.
Apply this diff:
-- NATS dashboard: https://natsdashboard.com/ (Add localhost) +- [NATS dashboard](https://natsdashboard.com/) (Add `nats://localhost:4222` as a server)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
.envs/.local/.django(1 hunks)README.md(1 hunks)ami/main/models.py(1 hunks)config/settings/base.py(2 hunks)docker-compose.yml(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docker-compose.yml
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
README.md
69-69: Bare URL used
(MD034, no-bare-urls)
🪛 Ruff (0.14.5)
config/settings/base.py
28-28: Unused noqa directive (non-enabled: E231, E501)
Remove unused noqa directive
(RUF100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: test
- GitHub Check: Pages changed
🔇 Additional comments (3)
ami/main/models.py (1)
219-219: LGTM! Clean feature flag addition.The new
async_pipeline_workersflag is well-named, properly documented, and follows the existing pattern. The default ofFalseis appropriate for a new opt-in feature.config/settings/base.py (1)
267-269: LGTM! Clean NATS configuration addition.The new
NATS_URLconfiguration follows the established pattern (similar toREDIS_URL) and uses a sensible default for local development..envs/.local/.django (1)
15-16: LGTM! Correct Docker environment configuration.The
NATS_URLconfiguration correctly uses the Docker service namenatsfor container networking and follows the established pattern of other service URLs in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
ami/jobs/models.py (1)
136-136: Drop unnecessary E713noqadirectives on these ValueErrorsThese two
ValueErrorlines only contain string literals; flake8’s E713 (“test for membership should be ‘not in’”) does not apply here, so# noqa E713isn’t suppressing anything and just adds noise. Recommend removing both trailing# noqa E713comments and letting linters run normally on these lines.Also applies to: 143-143
ami/ml/orchestration/task_state.py (1)
27-37: Persiststagessocleanup()works across processes and doesn’t leak keysRight now
initialize_job()only recordsself.stageson the current instance. IfTaskStateManager(job_id)is later constructed in a different worker/process (the typical case for cleanup),self.stagesis undefined andcleanup()will either raiseAttributeErroror silently skip stage-specific keys, leaving Redis entries behind.To make cleanup robust across processes:
- Persist the stage list in cache (e.g.,
_stages_key = f"job:{job_id}:pending_images_stages") ininitialize_job().- In
__init__, readself.stagesfrom that key (defaulting to an empty list).- In
cleanup(), iterate overself.stagesand then delete both the total key and the stages key.This ensures any
TaskStateManager(job_id)instance can safely clean up all keys even if it never calledinitialize_job()itself.Also applies to: 38-51, 120-126
🧹 Nitpick comments (1)
ami/jobs/models.py (1)
412-528:process_imagesextraction keeps v1 behavior together and readablePulling the synchronous image-processing logic into
MLJob.process_imageskeeps the legacy path intact while making the v1 flow easier to follow and evolve alongside the async path. Batching, progress updates, and save-result subtask handling all appear consistent with the prior inline logic.Optional: add type hints (
job: Job,images: list[SourceImage]) and a brief docstring onprocess_imagesto make its contract and expectations explicit.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
.envs/.local/.django(1 hunks)ami/jobs/models.py(4 hunks)ami/main/models.py(1 hunks)ami/ml/orchestration/task_state.py(1 hunks)config/settings/base.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- ami/main/models.py
- config/settings/base.py
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/models.py (3)
ami/base/schemas.py (1)
ConfigurableStageParam(6-12)ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(28-107)ami/main/models.py (12)
SourceImage(1666-1914)save(323-326)save(904-917)save(1139-1142)save(1482-1489)save(1581-1584)save(1872-1875)save(2153-2174)save(2390-2396)save(2569-2574)save(2857-2874)save(3494-3497)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
cleanup(20-23)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Redirect rules
- GitHub Check: Header rules
- GitHub Check: Pages changed
- GitHub Check: test
🔇 Additional comments (1)
.envs/.local/.django (1)
15-16: NATS_URL env wiring looks correctThe new
NATS_URL=nats://nats:4222entry is consistent with the other service URLs and appropriate for local Docker-based development; no issues from a config or secrets standpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
ami/jobs/models.py (1)
366-377: Zero-image jobs in async mode still look “running/successful” but never complete cleanlyIf
imagesis empty andasync_pipeline_workersisTrue:
queue_images_to_natsinitializes Redis state and sets the"results"stage toSUCCESSwithprogress=1.0, then returnsTrue.MLJob.runtreats this as success and returns without settingfinished_ator explicitly marking the job as completed in a way that distinguishes “nothing to do” from “queued work”.- The legacy sync path (
process_images) does finalize zero-image jobs (setsSUCCESSandfinished_at).This reproduces the earlier review concern: “empty” async jobs can appear stuck or oddly reported compared to v1 behavior.
A simple fix is to short‑circuit in
MLJob.runbefore the async branch:source_image_count = len(images) job.progress.update_stage("collect", total_images=source_image_count) + if job.project.feature_flags.async_pipeline_workers and source_image_count == 0: + job.logger.info("No images to process; marking async ML job as SUCCESS") + job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0) + job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0) + job.update_status(JobState.SUCCESS, save=False) + job.finished_at = datetime.datetime.now() + job.save() + return + if job.shuffle and source_image_count > 1:This keeps async behavior aligned with the sync path and avoids “STARTED forever / SUCCESS with no work” edge cases.
Also applies to: 400-410
🧹 Nitpick comments (4)
ami/jobs/models.py (1)
412-528:MLJob.process_imagesextraction looks consistent; minor polish onlyThe new
process_imagesclassmethod mirrors the existing in-process behavior and correctly:
- Respects
request_source_image_batch_sizefor chunking.- Tracks per-batch processing and save subtasks.
- Finalizes stages and job status (
JobState.SUCCESS) andfinished_at.Two minor nits you might consider (non-blocking):
- The bare
# noqaon thechunks = ...line has no obvious need; if it’s not suppressing a specific rule anymore, drop it to avoid future RUF100 noise.job.logger.warnis deprecated in favor ofjob.logger.warning(Line 386).Otherwise this refactor looks good and keeps the v1 path behavior intact.
ami/ml/orchestration/jobs.py (2)
11-27: Surface cleanup result or at least log outcome fromcleanup_nats_resources
cleanup_nats_resourcescurrently discards the boolean returned bycleanup_job_resourcesand provides no logging, even though it’s intended for operational cleanup.Consider either:
- Returning the boolean so callers can act on failures, and/or
- Adding a log line using
job.loggeror this module’s logger to record success/failure.This will make diagnosing stuck NATS streams/consumers easier once this TODO is wired into job completion.
29-108: Queueing flow is solid; minor style and robustness tweaks suggestedThe
queue_images_to_natsimplementation is generally good (prepares messages outside async, initializes Redis state, and logs success/failure). A few small points:
- Line 70:
for i, (image_pk, message) in enumerate(messages):–iis unused. Rename to_or dropenumerateto satisfy Ruff’s B007 and avoid confusion.- Lines 79–81: the broad
except Exception as e:over the publish call is understandable at a network boundary, but if NATS exposes a narrower exception hierarchy you could catch those instead, or at least document whyExceptionis intentional.Example tweak for the loop:
- async with TaskQueueManager() as manager: - for i, (image_pk, message) in enumerate(messages): + async with TaskQueueManager() as manager: + for image_pk, message in messages: try: logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")Overall behavior and return semantics (
Trueonly when all publishes succeed) look correct.ami/jobs/tasks.py (1)
132-149: Improve exception logging around result processing and ACKsThe catch blocks around job lookup and the main processing path currently use
logger.errorwith an exception message:
- Line 129:
except Exception as ack_error: logger.error(...)- Line 144:
except Job.DoesNotExist: logger.error(...)- Line 147:
except Exception as e: logger.error(...)Given you immediately re-raise in each case, switching to
logger.exception(orjob.logger.exceptionwhere available) will include the traceback without extra code, making these failure paths much easier to debug. You can still keep the human-readable message.Example:
- except Exception as e: - logger.error(f"Failed to process pipeline result for job {job_id}: {e}") + except Exception: + logger.exception(f"Failed to process pipeline result for job {job_id}") # Celery will automatically retry based on autoretry_for raiseSame idea applies to the ACK error and
Job.DoesNotExistcases.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
ami/jobs/models.py(3 hunks)ami/jobs/tasks.py(3 hunks)ami/ml/orchestration/jobs.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ami/jobs/models.py (2)
ami/ml/orchestration/jobs.py (1)
queue_images_to_nats(29-108)ami/main/models.py (12)
SourceImage(1666-1914)save(323-326)save(904-917)save(1139-1142)save(1482-1489)save(1581-1584)save(1872-1875)save(2153-2174)save(2390-2396)save(2569-2574)save(2857-2874)save(3494-3497)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(28-294)cleanup_job_resources(278-294)publish_task(119-150)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-126)cleanup(120-126)initialize_job(38-50)
ami/jobs/tasks.py (5)
ami/jobs/views.py (1)
result(256-339)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(28-294)acknowledge_task(210-229)ami/ml/orchestration/task_state.py (2)
TaskStateManager(17-126)update_state(55-84)ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/models/pipeline.py (3)
save_results(814-922)save_results(1120-1121)save(1128-1134)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py
70-70: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
79-79: Do not catch blind exception: Exception
(BLE001)
ami/jobs/tasks.py
128-128: Do not catch blind exception: Exception
(BLE001)
129-129: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
140-140: Abstract raise to an inner function
(TRY301)
144-144: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
147-147: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tasks.py (1)
201-223:log_timehelper is fine; behaviour matches usageThe
log_timeutility correctly:
- Uses
time.perf_counter()for timing.- Returns both elapsed duration and a callable that can be reused for subsequent measurements.
- Only logs when both
msgis provided andstart > 0, which fits the usage pattern inprocess_pipeline_result.No changes needed here.
| if job.project.feature_flags.async_pipeline_workers: | ||
| queued = queue_images_to_nats(job, images) | ||
| if not queued: | ||
| job.logger.error("Aborting job %s because images could not be queued to NATS", job.pk) | ||
| job.progress.update_stage("collect", status=JobState.FAILURE) | ||
| job.update_status(JobState.FAILURE) | ||
| job.finished_at = datetime.datetime.now() | ||
| job.save() | ||
| return | ||
| else: | ||
| cls.process_images(job, images) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Async ML path leaves overall job lifecycle undefined (status/finished_at, Celery status mismatch)
When async_pipeline_workers is enabled, MLJob.run queues NATS tasks and returns immediately. The surrounding run_job Celery task then completes and update_job_status will set job.status to the Celery task status (SUCCESS), even though:
- No images may yet have been processed by workers.
finished_atis never set for the async path.- The only subsequent updates come from
process_pipeline_result, which adjust per-stage progress only (via_update_job_progress) and never touchjob.statusorfinished_at.
Net effect: jobs can show SUCCESS while process/results stages are still <100%, and even once stages reach 100% there is no authoritative completion timestamp or explicit terminal status driven by the async pipeline.
Consider tightening this by:
- Having the async pipeline be the source of truth for completion, e.g. in
_update_job_progresswhenstage == "results"andprogress_percentage >= 1.0, setjob.statustoJobState.SUCCESS, setfinished_at, and (optionally) trigger NATS/Redis cleanup. - Optionally, for jobs with
async_pipeline_workers=True, avoid overwritingjob.statusinupdate_job_statusbased solely on therun_jobCelery task’s status, or treat that status as “queueing succeeded” only.
This will make the async path match the synchronous process_images semantics and avoid confusing “SUCCESS, 0% complete” states.
🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 400-410, the async pipeline path only queues
work and returns, leaving job.status and finished_at unset and allowing the
Celery run_job task to mark the job SUCCESS prematurely; update the logic so
that for async_pipeline_workers you do not overwrite job.status based solely on
the run_job Celery task (treat that status as "queued" or leave unchanged), and
move authoritative completion handling into the async progress handler: in
_update_job_progress, when stage == "results" and progress_percentage >= 1.0 set
job.status = JobState.SUCCESS, set job.finished_at = now(), save the job (and
optionally perform NATS/Redis cleanup), and ensure any queued-path error
handling still sets FAILURE and finished_at as currently done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
ami/ml/orchestration/nats_queue.py (1)
94-120: Detect and recreate consumer when TTR (ack_wait) changes.The current code creates a consumer with
ack_wait=TASK_TTRbut never updates it if the consumer already exists with a different value. JetStream doesn't allow mutatingack_waiton existing durables, so if the consumer is created with one TTR and later code expects a different TTR, the old value persists. This causes premature timeouts and unintended redeliveries.For example, if
reserve_task()creates the consumer before anypublish_task()calls, the consumer will useTASK_TTR=300. If future code changes expect a different TTR, the consumer retains the old value.Check the existing consumer's
ack_waitand recreate it when mismatched:async def _ensure_consumer(self, job_id: str): """Ensure consumer exists for the given job.""" if self.js is None: raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") stream_name = self._get_stream_name(job_id) consumer_name = self._get_consumer_name(job_id) subject = self._get_subject(job_id) + info = None try: info = await self.js.consumer_info(stream_name, consumer_name) - logger.debug(f"Consumer {consumer_name} already exists: {info}") + # Check if ack_wait matches the expected TTR + ack_wait_seconds = int(info.config.ack_wait.total_seconds()) + if ack_wait_seconds != TASK_TTR: + logger.info( + f"Consumer {consumer_name} has ack_wait={ack_wait_seconds}s, " + f"expected {TASK_TTR}s. Recreating consumer." + ) + await self.js.delete_consumer(stream_name, consumer_name) + info = None + else: + logger.debug(f"Consumer {consumer_name} already exists with correct ack_wait={ack_wait_seconds}s") except Exception: - # Consumer doesn't exist, create it + info = None + + if info is None: await self.js.add_consumer( stream=stream_name, config=ConsumerConfig( durable_name=consumer_name, ack_policy=AckPolicy.EXPLICIT, ack_wait=TASK_TTR, # Visibility timeout (TTR) max_deliver=5, # Max retry attempts deliver_policy=DeliverPolicy.ALL, max_ack_pending=100, # Max unacked messages filter_subject=subject, ), ) - logger.info(f"Created consumer {consumer_name}") + logger.info(f"Created consumer {consumer_name} with ack_wait={TASK_TTR}s")Based on learnings
🧹 Nitpick comments (3)
ami/ml/orchestration/task_state.py (1)
26-26: Annotate STAGES with ClassVar for clarity.The mutable class attribute should be annotated with
typing.ClassVarto make it explicit that this is a class-level constant shared across all instances.Apply this diff:
+from typing import ClassVar + class TaskStateManager: """ Manages job progress tracking state in Redis. Tracks pending images for jobs to calculate progress percentages as workers process images asynchronously. """ TIMEOUT = 86400 * 7 # 7 days in seconds - STAGES = ["process", "results"] + STAGES: ClassVar[list[str]] = ["process", "results"]ami/ml/orchestration/jobs.py (2)
70-70: Remove unused loop variable.The loop variable
iis not used within the loop body.Apply this diff:
- for i, (image_pk, message) in enumerate(messages): + for image_pk, message in messages:
95-107: Consider clarifying the empty-images case flow.When
imagesis empty, the job progress is updated at lines 95-97, then the function continues through the result logging logic. While correct, the flow is subtle. Consider adding an early return after line 97 to make the empty-images case explicit:if not images: job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0) job.save() + job.logger.info(f"No images to queue for job '{job_id}'") + return True # Log results (back in sync context)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
ami/ml/orchestration/jobs.py(1 hunks)ami/ml/orchestration/nats_queue.py(1 hunks)ami/ml/orchestration/task_state.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
result(256-339)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
Job(734-1019)JobState(27-63)logger(1004-1013)update_stage(168-188)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(31-296)cleanup_job_resources(280-296)publish_task(122-152)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-125)cleanup(119-125)initialize_job(39-49)
🪛 Ruff (0.14.5)
ami/ml/orchestration/nats_queue.py
76-76: Avoid specifying long messages outside the exception class
(TRY003)
84-84: Do not catch blind exception: Exception
(BLE001)
97-97: Avoid specifying long messages outside the exception class
(TRY003)
106-106: Do not catch blind exception: Exception
(BLE001)
134-134: Avoid specifying long messages outside the exception class
(TRY003)
148-148: Consider moving this statement to an else block
(TRY300)
150-150: Do not catch blind exception: Exception
(BLE001)
151-151: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
166-166: Avoid specifying long messages outside the exception class
(TRY003)
208-208: Do not catch blind exception: Exception
(BLE001)
209-209: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
223-223: Avoid specifying long messages outside the exception class
(TRY003)
228-228: Consider moving this statement to an else block
(TRY300)
229-229: Do not catch blind exception: Exception
(BLE001)
230-230: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
244-244: Avoid specifying long messages outside the exception class
(TRY003)
252-252: Consider moving this statement to an else block
(TRY300)
253-253: Do not catch blind exception: Exception
(BLE001)
254-254: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
268-268: Avoid specifying long messages outside the exception class
(TRY003)
275-275: Consider moving this statement to an else block
(TRY300)
276-276: Do not catch blind exception: Exception
(BLE001)
277-277: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/jobs.py
70-70: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
78-78: Do not catch blind exception: Exception
(BLE001)
ami/ml/orchestration/task_state.py
26-26: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)
160-166: Use timezone-aware datetime forfinished_at.Line 163 uses
datetime.now()which returns a naive (non-timezone-aware) datetime. Django models typically expect timezone-aware datetimes, especially whenUSE_TZ = True. This can cause subtle bugs or database inconsistencies.Apply this diff to use Django's timezone-aware datetime:
-from datetime import datetime +from django.utils import timezoneAnd then:
if stage == "results" and progress_percentage >= 1.0: job.status = JobState.SUCCESS job.progress.summary.status = JobState.SUCCESS - job.finished_at = datetime.now() + job.finished_at = timezone.now()
🧹 Nitpick comments (8)
ami/ml/orchestration/jobs.py (4)
11-26: Previous issue addressed; update docstring to document return value.The function now correctly returns the cleanup result (addressing the previous review comment), but the docstring is missing the
Returns:section.Apply this diff to complete the docstring:
def cleanup_nats_resources(job: "Job") -> bool: """ Clean up NATS JetStream resources (stream and consumer) for a completed job. Args: job: The Job instance + + Returns: + bool: True if cleanup successful, False otherwise """
72-72: Consider using job.logger for consistency.This line uses the module-level
logger, while other logging in this function usesjob.logger. Usingjob.loggerconsistently ensures all logs are captured by the JobLogHandler.- logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}") + job.logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")Note: You'll need to pass
jobinto the async function or capture it from the outer scope (which should work since it's already in scope).
95-97: Consider NATS resource lifecycle for empty job.When there are no images, the job progress is updated to complete, but should NATS stream/consumer resources be created at all, or immediately cleaned up if already created?
This could lead to orphaned NATS resources. Consider:
- Skipping NATS resource creation entirely when
len(images) == 0, or- Calling
cleanup_nats_resources(job)immediately after marking the job complete.
103-106: Document partial failure behavior.When some images fail to queue, the function returns
False, but successfully queued images remain in the NATS stream. This could leave the job in an inconsistent state.Consider documenting this behavior in the docstring, or implement one of these strategies:
- Return success if at least some images were queued, and let the job track partial completion
- Implement a rollback mechanism to remove successfully queued messages when any failure occurs
- Add retry logic for failed queue operations
The current behavior (returning
Falseon any failure) might trigger job-level error handling that doesn't account for partially queued work.ami/jobs/tasks.py (4)
126-128: Uselogging.exceptionto capture traceback automatically.When logging an exception in an except block,
logging.exceptionis preferred overlogging.errorbecause it automatically includes the stack trace, making debugging easier.Apply this diff:
except Exception as ack_error: - job.logger.error(f"Error acknowledging task via NATS: {ack_error}") + job.logger.exception(f"Error acknowledging task via NATS: {ack_error}") # Don't fail the task if ACK fails - data is already saved
141-147: Uselogging.exceptionin exception handlers.Similar to the NATS ACK handler, these exception handlers should use
logging.exceptioninstead oflogging.errorto automatically capture stack traces for easier debugging.Apply this diff:
except Job.DoesNotExist: - logger.error(f"Job {job_id} not found") + logger.exception(f"Job {job_id} not found") raise except Exception as e: - logger.error(f"Failed to process pipeline result for job {job_id}: {e}") + logger.exception(f"Failed to process pipeline result for job {job_id}: {e}") # Celery will automatically retry based on autoretry_for raise
160-166: Cleanup of NATS queues and Redis state when job completes.When the job reaches 100% completion (lines 160-163), there's no call to clean up the NATS JetStream queue or Redis state managed by
TaskStateManager. This was mentioned in previous reviews and is listed as a TODO in the PR objectives.Without cleanup:
- NATS queues will accumulate over time
- Redis keys persist beyond their usefulness
- Resource leaks could degrade system performance
Would you like me to generate a cleanup implementation that could be called here? It would need to:
- Delete the NATS stream for this job
- Call
TaskStateManager(job_id).cleanup()to remove Redis keys- Include error handling to avoid failing job finalization if cleanup fails
80-86: Consider extracting duplicate retry logic (optional).The pattern of calling
update_state, checking forNone, logging a warning, and retrying with the same parameters appears at both lines 80-86 and lines 131-138. You could extract this to a small helper function to reduce duplication.Example helper:
def _update_state_with_retry(state_manager, processed_image_ids, stage, request_id, task): progress_info = state_manager.update_state(processed_image_ids, stage=stage, request_id=request_id) if not progress_info: logger.warning( f"Another task is already processing results for job {state_manager.job_id}. " f"Retrying task {request_id} in 5 seconds..." ) raise task.retry(countdown=5, max_retries=10) return progress_infoThen use:
progress_info = _update_state_with_retry(state_manager, processed_image_ids, "process", self.request.id, self)However, since this is only used twice and the logic is straightforward, the current approach is also acceptable.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ami/jobs/tasks.py(3 hunks)ami/ml/orchestration/jobs.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tasks.py (5)
ami/jobs/views.py (2)
result(256-339)retry(114-125)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(31-296)acknowledge_task(212-231)ami/ml/orchestration/task_state.py (2)
TaskStateManager(17-125)update_state(54-83)ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)retry(873-885)save(954-965)ami/ml/models/pipeline.py (3)
save_results(814-922)save_results(1120-1121)save(1128-1134)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(31-296)cleanup_job_resources(280-296)publish_task(122-152)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-125)cleanup(119-125)initialize_job(39-49)
🪛 Ruff (0.14.5)
ami/jobs/tasks.py
126-126: Do not catch blind exception: Exception
(BLE001)
127-127: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
138-138: Abstract raise to an inner function
(TRY301)
142-142: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
145-145: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
ami/ml/orchestration/jobs.py
70-70: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
78-78: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tasks.py (1)
203-225: LGTM! Clean timing utility.The
log_timehelper is well-designed with clear semantics. Usingtime.perf_counter()is the right choice for measuring elapsed time, and the partial function pattern makes it convenient to chain timing measurements. The docstring with usage examples is helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
ami/ml/orchestration/jobs.py (1)
46-58: Use timezone-aware datetime for queue_timestamp.Line 56 uses
datetime.datetime.now()which returns a naive (timezone-unaware) datetime. Django applications should use timezone-aware timestamps to avoid comparison and storage issues.Apply this diff to fix:
+from django.utils import timezone + def queue_images_to_nats(job: "Job", images: list[SourceImage]): ... message = { "job_id": job.pk, "image_id": image_id, "image_url": image.url() if hasattr(image, "url") else None, "timestamp": (image.timestamp.isoformat() if hasattr(image, "timestamp") and image.timestamp else None), "batch_index": i, "total_images": len(images), - "queue_timestamp": datetime.datetime.now().isoformat(), + "queue_timestamp": timezone.now().isoformat(), }You may also be able to remove the
datetimeimport from line 1 if it's not used elsewhere.
🧹 Nitpick comments (2)
ami/ml/orchestration/jobs.py (2)
11-26: LGTM - cleanup function properly returns result.The function correctly returns the boolean result from the async cleanup operation (line 26), which allows callers to handle success/failure. Good fix from the previous review.
Regarding the TODO comment at line 11: I can help implement the cleanup mechanism if needed. This could be triggered from job completion handlers or as part of a Celery task. Would you like me to open an issue to track this, or would you prefer a code suggestion?
88-102: LGTM - clean result handling.The direct unpacking at line 88 is correct (previous None check issue was properly resolved). The logic clearly returns
Falsefor any failures andTruefor complete success.Lines 90-92 handle the edge case of an empty images list by marking the job complete. While this seems intentional and reasonable, consider adding a brief comment explaining this behavior for future maintainers.
Optional: Add explanatory comment:
successful_queues, failed_queues = async_to_sync(queue_all_images)() if not images: + # No images to process - mark job as complete immediately job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0) job.save()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ami/ml/orchestration/jobs.py(1 hunks)docker-compose.yml(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docker-compose.yml
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (5)
Job(734-1019)JobState(27-63)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(31-296)cleanup_job_resources(280-296)publish_task(122-152)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-125)cleanup(119-125)initialize_job(39-49)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py
77-77: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (2)
ami/ml/orchestration/jobs.py (2)
60-63: LGTM - proper Redis state initialization.Good pattern to initialize the task state manager with all image IDs before queuing begins. This ensures progress tracking is ready before any tasks are processed.
65-86: LGTM - batch processing with appropriate error handling.The broad
Exceptioncatch at line 77 is flagged by static analysis, but it's actually appropriate in this context. The batch-processing pattern here requires continuing through all images even if individual ones fail, and the exception is logged (line 78) while tracking success/failure counts. This provides resilience for partial failures.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
ami/jobs/tasks.py (1)
124-125: Consider using logging.exception for better debugging context.At lines 125, 140, and 143, the code uses
logging.error()within exception handlers. Usinglogging.exception()would automatically include the stack trace, making debugging easier without changing the log level.Example for line 125:
except Exception as ack_error: - job.logger.error(f"Error acknowledging task via NATS: {ack_error}") + job.logger.exception(f"Error acknowledging task via NATS: {ack_error}") # Don't fail the task if ACK fails - data is already savedApply similar changes to lines 140 and 143.
Also applies to: 140-140, 143-143
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
ami/jobs/tasks.py(3 hunks)ami/ml/orchestration/jobs.py(1 hunks)docker-compose.yml(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docker-compose.yml
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/orchestration/nats_queue.py (3)
TaskQueueManager(31-296)cleanup_job_resources(280-296)publish_task(122-152)ami/ml/orchestration/task_state.py (3)
TaskStateManager(17-125)cleanup(119-125)initialize_job(39-49)
ami/jobs/tasks.py (5)
ami/jobs/views.py (1)
result(256-339)ami/ml/orchestration/nats_queue.py (2)
TaskQueueManager(31-296)acknowledge_task(212-231)ami/ml/orchestration/task_state.py (2)
TaskStateManager(17-125)update_state(54-83)ami/jobs/models.py (4)
Job(734-1019)logger(1004-1013)update_stage(168-188)save(954-965)ami/ml/models/pipeline.py (3)
save_results(814-922)save_results(1120-1121)save(1128-1134)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py
76-76: Do not catch blind exception: Exception
(BLE001)
ami/jobs/tasks.py
124-124: Do not catch blind exception: Exception
(BLE001)
125-125: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
136-136: Abstract raise to an inner function
(TRY301)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
143-143: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (4)
ami/ml/orchestration/jobs.py (1)
11-25: LGTM: Cleanup function correctly returns success status.The async cleanup flow properly wraps the TaskQueueManager call and returns the boolean result, allowing callers to handle success/failure appropriately.
ami/jobs/tasks.py (3)
67-77: LGTM: Error handling correctly marks failed images as processed.The code properly extracts the failed image ID and marks it as processed (lines 74-75), preventing the job from hanging indefinitely when images fail. This addresses the earlier review feedback.
111-127: LGTM: NATS acknowledgment flow is robust.The acknowledgment logic correctly:
- Acknowledges tasks even if processing errored (preventing infinite redelivery)
- Handles ACK failures gracefully without failing the task (line 126 comment)
- Uses proper async/sync bridging
The broad exception catch on line 124 is intentional and well-documented.
201-223: LGTM: Useful timing utility.The
log_timehelper provides a clean pattern for measuring and logging execution times with partial function application. Well-documented with clear usage examples.
| def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> None: | ||
| from ami.jobs.models import Job, JobState # avoid circular import | ||
|
|
||
| with transaction.atomic(): | ||
| job = Job.objects.select_for_update().get(pk=job_id) | ||
| job.progress.update_stage( | ||
| stage, | ||
| status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED, | ||
| progress=progress_percentage, | ||
| ) | ||
| if stage == "results" and progress_percentage >= 1.0: | ||
| job.status = JobState.SUCCESS | ||
| job.progress.summary.status = JobState.SUCCESS | ||
| job.finished_at = datetime.now() | ||
| job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") | ||
| job.save() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use timezone-aware datetime for job completion timestamp.
Line 161 uses datetime.now() which returns a naive datetime. Django applications should use timezone-aware timestamps to avoid timezone-related bugs and inconsistencies.
Apply this diff:
def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> None:
from ami.jobs.models import Job, JobState # avoid circular import
+ from django.utils import timezone
with transaction.atomic():
job = Job.objects.select_for_update().get(pk=job_id)
job.progress.update_stage(
stage,
status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
progress=progress_percentage,
)
if stage == "results" and progress_percentage >= 1.0:
job.status = JobState.SUCCESS
job.progress.summary.status = JobState.SUCCESS
- job.finished_at = datetime.now()
+ job.finished_at = timezone.now()
job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
job.save()🤖 Prompt for AI Agents
In ami/jobs/tasks.py around lines 148-163, replace the naive datetime.now() used
to set job.finished_at with a timezone-aware timestamp by using
django.utils.timezone.now(); add an import for timezone (either at the module
top: from django.utils import timezone, or inside the function alongside the
existing local import) and set job.finished_at = timezone.now().
| if not images: | ||
| job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0) | ||
| job.save() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider updating both process and results stages for empty image batches.
When the images list is empty, only the "results" stage is marked as SUCCESS. The "process" stage remains uninitialized or incomplete, which could leave the job in an inconsistent state. While the Redis state would show 100% progress (empty list), the job's progress stages may not reflect this.
Consider updating both stages when there are no images to process:
if not images:
+ job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0)
job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
job.save()🤖 Prompt for AI Agents
In ami/ml/orchestration/jobs.py around lines 89 to 92, when images is empty only
the "results" stage is set to SUCCESS which leaves the "process" stage
uninitialized/incomplete; modify the empty-list branch to also update the
"process" stage to status=JobState.SUCCESS with progress=1.0 (and persist with
job.save()) so both stages reflect completed progress and the job state remains
consistent.
| messages.append((image.pk, message)) | ||
|
|
||
| # Store all image IDs in Redis for progress tracking | ||
| state_manager = TaskStateManager(job.pk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanessavmac you may be interested in looking into the details of the TaskStateManager
Summary
Initial version of the Processing service V2.
Current state
The V2 path is working but disabled in this PR pending the compatibility work. When enabled, starting a job will create a queue for that job and populate with one task per image. The tasks can be pulled and ACKed via the APIs below.
List of Changes
pipelineandid_onlyparameters to the/jobsAPI. This will allow workers to query for jobs for pipelines they can process. E.g.localhost:8000/api/v2/jobs?pipeline=11&ids_only=1localhost:8000/api/v2/jobs/11/tasks?batch=4localhost:8000/api/v2/jobs/11/result/DjangoorceleryworkercontainersTODOs:
Related Issues
See issues #970 and #971.
How to Test the Changes
This path can be enabled by turning on the
job.project.feature_flags.async_pipeline_workersfeature flag, seeami/jobs/models.py:400:And running the
ami workerfrom RolnickLab/ami-data-companion#94Test
Test both modes by tweaking the flag in the django admin console:

Deployment Notes
Checklist
Summary by CodeRabbit
New Features
Documentation
Chores
✏️ Tip: You can customize this high-level summary in your review settings.