Skip to content

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Oct 8, 2025

Summary

Initial version of the Processing service V2.

Current state

The V2 path is working but disabled in this PR pending the compatibility work. When enabled, starting a job will create a queue for that job and populate with one task per image. The tasks can be pulled and ACKed via the APIs below.

List of Changes

  • Add pipeline and id_only parameters to the /jobs API. This will allow workers to query for jobs for pipelines they can process. E.g. localhost:8000/api/v2/jobs?pipeline=11&ids_only=1
  • Add API for pulling tasks from a job queue: localhost:8000/api/v2/jobs/11/tasks?batch=4
  • Add API for posting results: localhost:8000/api/v2/jobs/11/result/
  • Added NATS JetStream to the docker compose. I also tried RabbitMQ and Beanstalkd, but they don't support the visibility timeout semantics we want or a disconnected mode of pulling and ACKing tasks.
  • Added debug targets for attaching to either the Django or celeryworker containers

TODOs:

  • A mechanism by which jobs can run using either v1 (push) or v2 (pull) processing services
  • Proper cleanup of queues after jobs complete
  • Unit tests for the new code paths

Related Issues

See issues #970 and #971.

How to Test the Changes

This path can be enabled by turning on the job.project.feature_flags.async_pipeline_workers feature flag, see ami/jobs/models.py:400:

        if job.project.feature_flags.async_pipeline_workers:
            cls.queue_images_to_nats(job, images)
        else:
            cls.process_images(job, images)

And running the ami worker from RolnickLab/ami-data-companion#94

Test

image

Test both modes by tweaking the flag in the django admin console:
image

Deployment Notes

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Optional async pipeline workers with queued task processing, background result handling, NATS-backed task queue, and Redis-backed per-image task state tracking.
    • New endpoints to fetch queued job tasks and to submit pipeline results.
    • Enhanced job listing filters: pipeline filter, ids-only, incomplete-only, and batch sizing.
  • Documentation

    • Quick Start: added NATS dashboard URL and localhost note.
  • Chores

    • Local NATS service added to dev compose, NATS URL env var and nats-py dependency included.

✏️ Tip: You can customize this high-level summary in your review settings.

@netlify
Copy link

netlify bot commented Oct 8, 2025

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit f221a1a
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/6920f664d6d70f000828cd02

@mihow
Copy link
Collaborator

mihow commented Oct 8, 2025

Exciting!

@carlosgjs carlosgjs marked this pull request as ready for review October 24, 2025 18:49
Copilot AI review requested due to automatic review settings October 24, 2025 18:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.

Key changes:

  • Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
  • Introduced new REST API endpoints for task pulling (/jobs/{id}/tasks) and result submission (/jobs/{id}/result)
  • Implemented Redis-based progress tracking to handle asynchronous worker updates

Reviewed Changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
requirements/base.txt Added nats-py dependency for NATS client support
object_model_diagram.md Added comprehensive Mermaid diagram documenting ML pipeline system architecture
docker-compose.yml Added NATS JetStream service with health checks and monitoring
config/settings/base.py Added NATS_URL configuration setting
ami/utils/nats_queue.py New TaskQueueManager class for NATS JetStream operations
ami/jobs/views.py Added task pulling and result submission endpoints with pipeline filtering
ami/jobs/utils.py Helper function for running async code in sync Django context
ami/jobs/tasks.py New Celery task for processing pipeline results asynchronously
ami/jobs/task_state.py TaskStateManager for Redis-based job progress tracking
ami/jobs/models.py Added queue_images_to_nats method and NATS cleanup logic
ami/base/views.py Fixed request.data handling when not a dict
README.md Added NATS dashboard documentation link
.vscode/launch.json Added debug configurations for Django and Celery containers
.envs/.local/.django Added NATS_URL environment variable
.dockerignore Expanded with comprehensive ignore patterns
Comments suppressed due to low confidence (1)

object_model_diagram.md:1

  • The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@carlosgjs carlosgjs requested a review from mihow October 24, 2025 18:59
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 31, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds NATS JetStream queuing and management, Redis-backed per-job task state, a Celery result-processing task, an async ML job execution path behind a feature flag, API endpoints to reserve/submit tasks, Docker/env/docs entries for NATS, and orchestration helpers.

Changes

Cohort / File(s) Summary
Configuration & Environment
\.envs/.local/.django, config/settings/base.py, requirements/base.txt
Add NATS_URL env entry and default setting; add nats-py==2.10.0 dependency.
Infrastructure & Documentation
docker-compose.yml, README.md
Add nats service (JetStream + monitoring), include nats in django depends_on, adjust ui-dev entrypoint, and document NATS dashboard URL.
Feature Flags
ami/main/models.py
Add async_pipeline_workers: bool = False to ProjectFeatureFlags.
NATS Queue Manager
ami/ml/orchestration/nats_queue.py
New TaskQueueManager with NATS connection/JetStream handling, per-job stream/consumer ensure, publish/reserve/ack, and resource cleanup.
Orchestration Helpers
ami/ml/orchestration/jobs.py
New queue_images_to_nats(job, images) and cleanup_nats_resources(job); prepares messages, initializes Redis task state, publishes tasks, returns success boolean.
Task State Management
ami/ml/orchestration/task_state.py
New TaskStateManager and TaskProgress for Redis-backed per-job/per-stage pending lists, progress calculation, locking, and cleanup.
Job Model Changes
ami/jobs/models.py
MLJob.run now branches on project.feature_flags.async_pipeline_workers; added MLJob.process_images classmethod for the synchronous path and typed image lists.
Celery Tasks & Helpers
ami/jobs/tasks.py
Add Celery task process_pipeline_result, helpers log_time and _update_job_progress; integrate TaskStateManager, result saving, timing/logging, and NATS ACK flow.
API Filtering & Endpoints
ami/jobs/views.py, ami/utils/requests.py
Add JobFilterSet, support ids_only / incomplete_only query params and batch OpenAPI param; add GET /jobs/{id}/tasks to reserve NATS tasks and POST /jobs/{id}/result to queue result processing.
Module Init / Refactor
ami/ml/orchestration/__init__.py
Remove unconditional re-export from .processing and add explanatory comment about circular import.
Misc docs / env
README.md, \.envs/.local/.django
Document NATS dashboard URL and add NATS_URL example in local env file.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant MLJob
    participant Flags as FeatureFlags
    participant QueueMgr as TaskQueueManager
    participant State as TaskStateManager
    participant Worker
    participant Celery as process_pipeline_result
    participant DB as Database

    Client->>MLJob: run(job, images)
    MLJob->>Flags: check async_pipeline_workers
    alt async enabled
        MLJob->>State: initialize_job(image_ids)
        MLJob->>QueueMgr: queue_images_to_nats(job, images)
        loop per image
            QueueMgr->>QueueMgr: publish_task(job_id, message)
        end
        QueueMgr-->>MLJob: return success/failure
    else sync path
        MLJob->>MLJob: process_images(job, images)
        MLJob->>DB: persist results & progress
    end

    Note over Worker,QueueMgr: Workers later reserve and process queued tasks
    Worker->>QueueMgr: reserve_task(job_id, batch)
    Worker->>Celery: invoke pipeline, produce result + reply_subject
    Celery->>State: update_state(processed_ids, "process", request_id)
    Celery->>DB: save pipeline results
    Celery->>QueueMgr: acknowledge_task(reply_subject)
    Celery->>State: update_state(processed_ids, "results", request_id)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45–75 minutes

  • Pay extra attention to JetStream resource names, consumer configuration, and error handling in ami/ml/orchestration/nats_queue.py.
  • Verify Redis locking, TTL, and atomic progress updates in ami/ml/orchestration/task_state.py.
  • Inspect async↔sync boundaries (async_to_sync) and ORM safety in ami/ml/orchestration/jobs.py and ami/jobs/tasks.py.
  • Review Celery retry/backoff semantics, ACK handling, and concurrent progress update race conditions around _update_job_progress.

Possibly related PRs

Suggested reviewers

  • mihow

Poem

🐰 Hopping messages into the stream,
I tuck each image in Redis dream,
JetStream hums as workers pry,
Celery nods when results fly by,
Pipelines queue, process, and gleam. ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 62.86% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title '[integration] Processing service V2' clearly describes the main change: introducing the V2 processing service with NATS integration, matching the primary objectives of this PR.
Description check ✅ Passed The PR description is largely complete, covering summary, list of changes, related issues, how to test, and deployment notes. However, the 'Deployment Notes' section is empty and unit tests are incomplete (acknowledged in TODOs).
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef8f16c and d254867.

📒 Files selected for processing (19)
  • .dockerignore (1 hunks)
  • .envs/.local/.django (1 hunks)
  • .gitignore (1 hunks)
  • .vscode/launch.json (1 hunks)
  • README.md (1 hunks)
  • ami/base/views.py (1 hunks)
  • ami/jobs/models.py (8 hunks)
  • ami/jobs/tasks.py (2 hunks)
  • ami/jobs/views.py (3 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
  • ami/ml/orchestration/utils.py (1 hunks)
  • ami/utils/requests.py (2 hunks)
  • config/settings/base.py (2 hunks)
  • docker-compose.yml (4 hunks)
  • object_model_diagram.md (1 hunks)
  • requirements/base.txt (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (45-138)
ami/jobs/models.py (4)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • final_states (58-59)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • reserve_task (152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • mark_images_processed (48-61)
  • get_progress (63-90)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/jobs/models.py (5)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • update_stage (168-188)
  • save (947-958)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • save_results (809-917)
  • save_results (1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
  • Job (727-1012)
  • logger (997-1006)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (28-294)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • cleanup (92-97)
  • initialize_job (38-46)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
  • logger (997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
  • get (1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (1)
  • SourceImage (1622-1870)
ami/ml/models/pipeline.py (2)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 LanguageTool
object_model_diagram.md

[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


31-31: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


38-38: Bare URL used

(MD034, no-bare-urls)


39-39: Bare URL used

(MD034, no-bare-urls)


40-40: Bare URL used

(MD034, no-bare-urls)


41-41: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


43-43: Bare URL used

(MD034, no-bare-urls)


61-61: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


77-77: Bare URL used

(MD034, no-bare-urls)


97-97: Bare URL used

(MD034, no-bare-urls)


118-118: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


122-122: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


126-126: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


130-130: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)

🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py

70-70: Unused method argument: ttr

(ARG002)


73-73: Avoid specifying long messages outside the exception class

(TRY003)


81-81: Do not catch blind exception: Exception

(BLE001)


94-94: Avoid specifying long messages outside the exception class

(TRY003)


103-103: Do not catch blind exception: Exception

(BLE001)


132-132: Avoid specifying long messages outside the exception class

(TRY003)


146-146: Consider moving this statement to an else block

(TRY300)


148-148: Do not catch blind exception: Exception

(BLE001)


149-149: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


164-164: Avoid specifying long messages outside the exception class

(TRY003)


206-206: Do not catch blind exception: Exception

(BLE001)


207-207: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


221-221: Avoid specifying long messages outside the exception class

(TRY003)


226-226: Consider moving this statement to an else block

(TRY300)


227-227: Do not catch blind exception: Exception

(BLE001)


228-228: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


242-242: Avoid specifying long messages outside the exception class

(TRY003)


250-250: Consider moving this statement to an else block

(TRY300)


251-251: Do not catch blind exception: Exception

(BLE001)


252-252: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


266-266: Avoid specifying long messages outside the exception class

(TRY003)


273-273: Consider moving this statement to an else block

(TRY300)


274-274: Do not catch blind exception: Exception

(BLE001)


275-275: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/task_state.py

35-35: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


36-36: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/jobs/views.py

33-43: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


73-73: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


218-218: Unused method argument: pk

(ARG002)


236-236: Avoid specifying long messages outside the exception class

(TRY003)


244-244: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


283-283: Avoid specifying long messages outside the exception class

(TRY003)


288-288: Avoid specifying long messages outside the exception class

(TRY003)


298-298: Avoid specifying long messages outside the exception class

(TRY003)


301-301: Avoid specifying long messages outside the exception class

(TRY003)


322-322: Do not catch blind exception: Exception

(BLE001)


323-323: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

45-45: Unused function argument: self

(ARG001)


120-120: Do not catch blind exception: Exception

(BLE001)


121-121: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


133-133: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


136-136: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

config/settings/base.py

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/jobs.py

69-69: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

ami/ml/orchestration/utils.py

14-14: Do not catch blind exception: Exception

(BLE001)


15-15: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


482-482: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

carlosgjs and others added 2 commits October 31, 2025 14:33
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

80-143: Don’t mark progress until after durable save + ACK

We remove image IDs from Redis and advance the “process” stage before save_results runs and well before we confirm the JetStream ACK. If save_results throws or the ACK fails, the task retries (or JetStream redelivers), yet Redis already shows the images done and the job stage is SUCCESS. At that point the worker cannot recover accurate progress, and the retry will double-write results. Move both update_state(..., "process") and the corresponding _update_job_progress call to after a successful save_results and ACK, and make ACK failure fatal so Celery retries before we mutate Redis. Perform the same gating for the “results” stage so we never remove IDs unless the message is fully settled. request_verification

-    state_manager = TaskStateManager(job_id)
-
-    progress_info = state_manager.update_state(processed_image_ids, stage="process", request_id=self.request.id)
-    if not progress_info:
-        ...
-    try:
-        _update_job_progress(job_id, "process", progress_info.percentage)
-        ...
-        if pipeline_result:
-            job.pipeline.save_results(...)
-        ...
-        ack_success = run_in_async_loop(...)
-        if ack_success:
-            ...
-        # Update job stage with calculated progress
-        progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id)
-        if not progress_info:
-            ...
-        _update_job_progress(job_id, "results", progress_info.percentage)
+    state_manager = TaskStateManager(job_id)
+
+    try:
+        job = Job.objects.get(pk=job_id)
+        ...
+        if pipeline_result:
+            job.pipeline.save_results(results=pipeline_result, job_id=job.pk)
+            ...
+        ack_success = run_in_async_loop(...)
+        if not ack_success:
+            raise RuntimeError(f"Failed to acknowledge task via NATS: {reply_subject}")
+
+        progress_info = state_manager.update_state(
+            processed_image_ids, stage="process", request_id=self.request.id
+        )
+        if not progress_info:
+            raise self.retry(countdown=5, max_retries=10)
+        _update_job_progress(job_id, "process", progress_info.percentage)
+
+        progress_info = state_manager.update_state(
+            processed_image_ids, stage="results", request_id=self.request.id
+        )
+        if not progress_info:
+            raise self.retry(countdown=5, max_retries=10)
+        _update_job_progress(job_id, "results", progress_info.percentage)
🧹 Nitpick comments (4)
ami/jobs/utils.py (1)

36-48: Use logging.exception in exception handlers for better diagnostics.

In exception handlers, prefer logging.exception over logging.error to automatically include the stack trace, which aids debugging.

Based on learnings

Apply this diff:

     try:
         image = SourceImage.objects.select_related("deployment__project").get(pk=image_id)
     except SourceImage.DoesNotExist:
-        logger.error(f"SourceImage with id {image_id} does not exist")
+        logger.exception(f"SourceImage with id {image_id} does not exist")
         raise

     # Fetch the pipeline and validate it exists
     try:
         pipeline = Pipeline.objects.get(pk=pipeline_id)
     except Pipeline.DoesNotExist:
-        logger.error(f"Pipeline with id {pipeline_id} does not exist")
+        logger.exception(f"Pipeline with id {pipeline_id} does not exist")
         raise
ami/jobs/management/commands/process_single_image.py (3)

51-65: Consider removing duplicate validation logic.

The validation of SourceImage and Pipeline here duplicates the validation already performed inside submit_single_image_job (lines 36-48 in ami/jobs/utils.py). While this provides earlier feedback to the user, it creates maintenance overhead if validation logic changes.

If you prefer immediate user feedback, keep this validation. Otherwise, rely on the exceptions raised by submit_single_image_job and handle them in the submission try-except block (lines 71-78).


71-78: Replace blind Exception catch with specific exceptions.

Catching Exception is too broad and may mask unexpected errors. Catch only the specific exceptions that submit_single_image_job can raise.

Apply this diff:

         try:
             job = submit_single_image_job(
                 image_id=image_id,
                 pipeline_id=pipeline_id,
                 job_name=job_name,
             )
-        except Exception as e:
-            raise CommandError(f"Failed to submit job: {str(e)}")
+        except (SourceImage.DoesNotExist, Pipeline.DoesNotExist, Project.DoesNotExist) as e:
+            raise CommandError(f"Failed to submit job: {str(e)}") from e

Note: If you keep the validation at lines 51-65, SourceImage.DoesNotExist and Pipeline.DoesNotExist won't be raised here, so you can simplify to just Project.DoesNotExist.


161-161: Remove unused noqa directive.

The noqa: E221 directive is unnecessary as E221 is not an enabled rule in your Ruff configuration.

Apply this diff:

                 for error in job.progress.errors[-5:]:  # Last 5 errors
-                    self.stdout.write(f"  - {error}")  # noqa: E221
+                    self.stdout.write(f"  - {error}")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 84ee5a2 and 4480b0d.

📒 Files selected for processing (7)
  • ami/jobs/management/commands/README.md (1 hunks)
  • ami/jobs/management/commands/process_single_image.py (1 hunks)
  • ami/jobs/tasks.py (3 hunks)
  • ami/jobs/utils.py (1 hunks)
  • ami/ml/orchestration/__init__.py (1 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ami/jobs/management/commands/process_single_image.py (2)
ami/jobs/utils.py (1)
  • submit_single_image_job (12-78)
ami/main/models.py (1)
  • Classification (2208-2352)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
ami/jobs/utils.py (1)
ami/jobs/models.py (2)
  • Job (734-1019)
  • enqueue (816-833)
ami/jobs/tasks.py (6)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-126)
  • update_state (55-84)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (809-917)
  • save_results (1107-1108)
  • save (1115-1121)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (5)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (2)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-126)
  • cleanup (120-126)
  • initialize_job (38-50)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
🪛 Ruff (0.14.3)
ami/jobs/management/commands/process_single_image.py

44-44: Unused method argument: args

(ARG002)


58-58: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


58-58: Avoid specifying long messages outside the exception class

(TRY003)


65-65: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


65-65: Avoid specifying long messages outside the exception class

(TRY003)


77-77: Do not catch blind exception: Exception

(BLE001)


78-78: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


78-78: Avoid specifying long messages outside the exception class

(TRY003)


78-78: Use explicit conversion flag

Replace with conversion flag

(RUF010)


161-161: Unused noqa directive (non-enabled: E221)

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/task_state.py

35-35: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


36-36: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


53-53: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/jobs/utils.py

40-40: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


47-47: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/__init__.py

5-5: Unused noqa directive (unused: F403; non-enabled: F401)

Remove unused noqa directive

(RUF100)

ami/jobs/tasks.py

128-128: Do not catch blind exception: Exception

(BLE001)


129-129: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


140-140: Abstract raise to an inner function

(TRY301)


144-144: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


147-147: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

69-69: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (4)
ami/jobs/utils.py (1)

56-78: Well-structured job creation and enqueuing flow.

The implementation correctly generates a default job name, creates the Job with appropriate associations (project, pipeline, image), and enqueues it via Celery. Logging at key points aids observability.

ami/jobs/management/commands/process_single_image.py (3)

18-42: Well-designed CLI interface.

The argument definitions are clear and provide good defaults. The --wait flag with configurable --poll-interval offers flexible UX for both scripted and interactive use.


100-131: Excellent job polling and progress display implementation.

The polling loop efficiently tracks status changes, provides live progress updates using \r for in-place updates, and gracefully handles KeyboardInterrupt. This creates a polished user experience.


137-165: Comprehensive result reporting enhances debuggability.

The result display logic provides actionable feedback for success, failure, and unexpected statuses, including detection/classification counts and error summaries. This is particularly valuable for testing and debugging workflows.

Comment on lines 50 to 54
# Infer project from image if not provided
if project_id is None:
project = image.deployment.project
else:
project = Project.objects.get(pk=project_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle potential Project.DoesNotExist exception.

When project_id is explicitly provided, Project.objects.get(pk=project_id) can raise Project.DoesNotExist, but this exception is neither caught nor documented in the function's Raises section.

Wrap the fetch in a try-except block and document the exception:

     # Infer project from image if not provided
     if project_id is None:
         project = image.deployment.project
     else:
-        project = Project.objects.get(pk=project_id)
+        try:
+            project = Project.objects.get(pk=project_id)
+        except Project.DoesNotExist:
+            logger.exception(f"Project with id {project_id} does not exist")
+            raise

And update the docstring:

     Raises:
         SourceImage.DoesNotExist: If the image doesn't exist
         Pipeline.DoesNotExist: If the pipeline doesn't exist
+        Project.DoesNotExist: If the provided project_id doesn't exist
🤖 Prompt for AI Agents
In ami/jobs/utils.py around lines 50 to 54, the code calls
Project.objects.get(pk=project_id) without handling Project.DoesNotExist; wrap
that call in a try/except that catches Project.DoesNotExist and re-raises it
with clearer context (e.g., include project_id and a brief message) using "raise
Project.DoesNotExist(... ) from e" so stacktrace is preserved, and update the
function docstring's Raises section to list Project.DoesNotExist with the new
behavior.

Comment on lines 38 to 126
def initialize_job(self, image_ids: list[str], stages: list[str]) -> None:
"""
Initialize job tracking with a list of image IDs to process.
Args:
image_ids: List of image IDs that need to be processed
stages: List of stages to track for each image
"""
self.stages = stages
for stage in stages:
cache.set(self._get_pending_key(stage), image_ids, timeout=self.TIMEOUT)

cache.set(self._total_key, len(image_ids), timeout=self.TIMEOUT)

def _get_pending_key(self, stage: str) -> str:
return f"{self._pending_key}:{stage}" # noqa E231

def update_state(
self,
processed_image_ids: set[str],
stage: str,
request_id: str,
) -> None | TaskProgress:
"""
Update the task state with newly processed images.
Args:
processed_image_ids: Set of image IDs that have just been processed
"""
# Create a unique lock key for this job
lock_key = f"job:{self.job_id}:process_results_lock"
lock_timeout = 360 # 6 minutes (matches task time_limit)
lock_acquired = cache.add(lock_key, request_id, timeout=lock_timeout)
if not lock_acquired:
return None

try:
# Update progress tracking in Redis
progress_info = self._get_progress(processed_image_ids, stage)
return progress_info
finally:
# Always release the lock when done
current_lock_value = cache.get(lock_key)
# Only delete if we still own the lock (prevents race condition)
if current_lock_value == request_id:
cache.delete(lock_key)
logger.debug(f"Released lock for job {self.job_id}, task {request_id}")

def _get_progress(self, processed_image_ids: set[str], stage: str) -> TaskProgress | None:
"""
Get current progress information for the job.
Returns:
TaskProgress namedtuple with fields:
- remaining: Number of images still pending (or None if not tracked)
- total: Total number of images (or None if not tracked)
- processed: Number of images processed (or None if not tracked)
- percentage: Progress as float 0.0-1.0 (or None if not tracked)
"""
pending_images = cache.get(self._get_pending_key(stage))
total_images = cache.get(self._total_key)
if pending_images is None or total_images is None:
return None
remaining_images = [img_id for img_id in pending_images if img_id not in processed_image_ids]
assert len(pending_images) >= len(remaining_images)
cache.set(self._get_pending_key(stage), remaining_images, timeout=self.TIMEOUT)

remaining = len(remaining_images)
processed = total_images - remaining
percentage = float(processed) / total_images if total_images > 0 else 1.0
logger.info(
f"Pending images from Redis for job {self.job_id} {stage}: "
f"{remaining}/{total_images}: {percentage*100}%"
)

return TaskProgress(
remaining=remaining,
total=total_images,
processed=processed,
percentage=percentage,
)

def cleanup(self) -> None:
"""
Delete all Redis keys associated with this job.
"""
for stage in self.stages:
cache.delete(self._get_pending_key(stage))
cache.delete(self._total_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Persist the tracked stages so cleanup() works in new processes

cleanup() assumes self.stages is populated on this instance, but initialize_job() only sets it in-memory. As soon as another worker or process instantiates TaskStateManager(job_id) (the typical pattern for cleanup after a job finishes), self.stages is missing and cleanup() raises AttributeError, leaving Redis keys behind. Persist the stage list alongside the other job keys so any instance can retrieve it before cleaning up. request_verification

 class TaskStateManager:
@@
-        self._pending_key = f"job:{job_id}:pending_images"  # noqa E231
-        self._total_key = f"job:{job_id}:pending_images_total"  # noqa E231
+        self._pending_key = f"job:{job_id}:pending_images"
+        self._total_key = f"job:{job_id}:pending_images_total"
+        self._stages_key = f"job:{job_id}:pending_images_stages"
+        self.stages: list[str] = cache.get(self._stages_key, [])
@@
-        self.stages = stages
+        self.stages = stages
+        cache.set(self._stages_key, stages, timeout=self.TIMEOUT)
@@
     def cleanup(self) -> None:
@@
-        for stage in self.stages:
+        for stage in self.stages:
             cache.delete(self._get_pending_key(stage))
         cache.delete(self._total_key)
+        cache.delete(self._stages_key)

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.14.3)

53-53: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/jobs/models.py (1)

412-413: Consider adding type hints and a docstring.

The new process_images method would benefit from explicit type hints for its parameters and a docstring describing its purpose, especially since the codebase is adding type annotations (see line 366).

Example:

@classmethod
def process_images(cls, job: "Job", images: list[SourceImage]) -> None:
    """
    Process images through the ML pipeline in batches.
    
    Args:
        job: The Job instance to update with progress
        images: List of SourceImage instances to process
    """
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4480b0d and 3032709.

📒 Files selected for processing (1)
  • ami/jobs/models.py (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (12)
  • SourceImage (1622-1870)
  • save (320-323)
  • save (883-896)
  • save (1118-1121)
  • save (1461-1468)
  • save (1560-1563)
  • save (1828-1831)
  • save (2109-2130)
  • save (2346-2352)
  • save (2525-2530)
  • save (2813-2830)
  • save (3450-3453)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 Ruff (0.14.3)
ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


437-437: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


489-489: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

🔇 Additional comments (3)
ami/jobs/models.py (3)

325-326: Good use of local import to avoid circular dependencies.

Importing queue_images_to_nats within the method scope is appropriate here, especially if it helps avoid circular import issues between the jobs and orchestration modules.


366-366: Good addition of type annotation.

The explicit type annotation list[SourceImage] improves code clarity and enables better IDE support.


400-410: LGTM: Async routing with proper failure handling.

The feature-flag-driven routing logic correctly handles the failure case when queue_images_to_nats returns False by marking the job as FAILURE, logging the error, and exiting early. This addresses the critical concern from previous reviews.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
config/settings/base.py (1)

27-29: Remove the unused noqa directive.

The # noqa: E231, E501 comment on line 28 is suppressing lint rules that aren't triggered, causing a Ruff RUF100 violation. This issue was previously flagged and should be addressed.

🧹 Nitpick comments (2)
README.md (1)

43-43: Consider wrapping the URL in angle brackets.

To satisfy the Markdown linter, wrap the bare URL in angle brackets: <https://natsdashboard.com/>.

Apply this diff:

-- NATS dashboard: https://natsdashboard.com/ (Add localhost)
+- NATS dashboard: <https://natsdashboard.com/> (Add localhost)
ami/utils/requests.py (1)

149-154: Consider using OpenApiTypes for consistency.

For consistency with the newly added parameters, consider updating project_id_doc_param to use OpenApiTypes.INT instead of the built-in int type.

 project_id_doc_param = OpenApiParameter(
     name="project_id",
     description="Filter by project ID",
     required=False,
-    type=int,
+    type=OpenApiTypes.INT,
 )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3032709 and 3e7ef3b.

📒 Files selected for processing (4)
  • .gitignore (1 hunks)
  • README.md (1 hunks)
  • ami/utils/requests.py (2 hunks)
  • config/settings/base.py (2 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
README.md

43-43: Bare URL used

(MD034, no-bare-urls)

🪛 Ruff (0.14.3)
config/settings/base.py

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Redirect rules
  • GitHub Check: Header rules
  • GitHub Check: Pages changed
  • GitHub Check: test
🔇 Additional comments (4)
.gitignore (1)

280-281: Clean .gitignore addition.

The new entry appropriately excludes HuggingFace cache directories from version control, following established patterns in the file. The formatting and spacing are consistent with surrounding entries.

config/settings/base.py (1)

266-268: LGTM!

The NATS_URL configuration is properly added with a sensible default and follows the existing pattern for environment variable configuration.

ami/utils/requests.py (2)

156-167: LGTM! Clear and well-structured parameter definitions.

Both ids_only_param and incomplete_only_param are correctly defined with appropriate types and clear descriptions for the OpenAPI schema.


168-173: Batch parameter validation is properly implemented.

The endpoint already validates the batch parameter at ami/jobs/views.py:230 using IntegerField(required=False, min_value=1).clean(), which ensures it is a positive integer with a default of 1. No further action is needed.

carlosgjs added a commit to uw-ssec/antenna that referenced this pull request Nov 11, 2025
…result endpoints)

- Add ids_only and incomplete_only query parameters to job list endpoint
- Add JobFilterSet with pipeline__slug filtering and search fields
- Add tasks() endpoint (stubbed) for NATS JetStream integration
- Add result() endpoint (stubbed) for pipeline result processing
- Add utility OpenApiParameter definitions for new query params

The tasks() and result() endpoints are documented and stubbed out,
awaiting NATS and Celery task dependencies from PR RolnickLab#987.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
config/settings/base.py (1)

27-29: Remove the unused noqa directive.

The # noqa: E231, E501 comment is suppressing non-enabled rules, causing a lint failure (RUF100). This was previously flagged but remains unaddressed.

Apply this diff:

 EXTERNAL_BASE_URL = env(
-    "EXTERNAL_BASE_URL", default=f"http://{EXTERNAL_HOSTNAME}"  # noqa: E231, E501 # type: ignore[no-untyped-call]
+    "EXTERNAL_BASE_URL",
+    default=f"http://{EXTERNAL_HOSTNAME}",
 )
+# type: ignore[no-untyped-call]
🧹 Nitpick comments (1)
README.md (1)

69-69: Use markdown link syntax and clarify the instruction.

The bare URL violates markdown linting (MD034), and the "(Add localhost)" instruction could be clearer for users unfamiliar with the NATS dashboard.

Apply this diff:

-- NATS dashboard: https://natsdashboard.com/ (Add localhost)
+- [NATS dashboard](https://natsdashboard.com/) (Add `nats://localhost:4222` as a server)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e7ef3b and 04be994.

📒 Files selected for processing (5)
  • .envs/.local/.django (1 hunks)
  • README.md (1 hunks)
  • ami/main/models.py (1 hunks)
  • config/settings/base.py (2 hunks)
  • docker-compose.yml (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-compose.yml
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
README.md

69-69: Bare URL used

(MD034, no-bare-urls)

🪛 Ruff (0.14.5)
config/settings/base.py

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Redirect rules
  • GitHub Check: Header rules
  • GitHub Check: test
  • GitHub Check: Pages changed
🔇 Additional comments (3)
ami/main/models.py (1)

219-219: LGTM! Clean feature flag addition.

The new async_pipeline_workers flag is well-named, properly documented, and follows the existing pattern. The default of False is appropriate for a new opt-in feature.

config/settings/base.py (1)

267-269: LGTM! Clean NATS configuration addition.

The new NATS_URL configuration follows the established pattern (similar to REDIS_URL) and uses a sensible default for local development.

.envs/.local/.django (1)

15-16: LGTM! Correct Docker environment configuration.

The NATS_URL configuration correctly uses the Docker service name nats for container networking and follows the established pattern of other service URLs in this file.

@mihow mihow changed the title feat: Processing service V2 [integrationProcessing service V2 Nov 21, 2025
@mihow mihow changed the title [integrationProcessing service V2 [integration] Processing service V2 Nov 21, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
ami/jobs/models.py (1)

136-136: Drop unnecessary E713 noqa directives on these ValueErrors

These two ValueError lines only contain string literals; flake8’s E713 (“test for membership should be ‘not in’”) does not apply here, so # noqa E713 isn’t suppressing anything and just adds noise. Recommend removing both trailing # noqa E713 comments and letting linters run normally on these lines.

Also applies to: 143-143

ami/ml/orchestration/task_state.py (1)

27-37: Persist stages so cleanup() works across processes and doesn’t leak keys

Right now initialize_job() only records self.stages on the current instance. If TaskStateManager(job_id) is later constructed in a different worker/process (the typical case for cleanup), self.stages is undefined and cleanup() will either raise AttributeError or silently skip stage-specific keys, leaving Redis entries behind.

To make cleanup robust across processes:

  • Persist the stage list in cache (e.g., _stages_key = f"job:{job_id}:pending_images_stages") in initialize_job().
  • In __init__, read self.stages from that key (defaulting to an empty list).
  • In cleanup(), iterate over self.stages and then delete both the total key and the stages key.

This ensures any TaskStateManager(job_id) instance can safely clean up all keys even if it never called initialize_job() itself.

Also applies to: 38-51, 120-126

🧹 Nitpick comments (1)
ami/jobs/models.py (1)

412-528: process_images extraction keeps v1 behavior together and readable

Pulling the synchronous image-processing logic into MLJob.process_images keeps the legacy path intact while making the v1 flow easier to follow and evolve alongside the async path. Batching, progress updates, and save-result subtask handling all appear consistent with the prior inline logic.

Optional: add type hints (job: Job, images: list[SourceImage]) and a brief docstring on process_images to make its contract and expectations explicit.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 04be994 and 0a5c89e.

📒 Files selected for processing (5)
  • .envs/.local/.django (1 hunks)
  • ami/jobs/models.py (4 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
  • config/settings/base.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • ami/main/models.py
  • config/settings/base.py
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/models.py (3)
ami/base/schemas.py (1)
  • ConfigurableStageParam (6-12)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (12)
  • SourceImage (1666-1914)
  • save (323-326)
  • save (904-917)
  • save (1139-1142)
  • save (1482-1489)
  • save (1581-1584)
  • save (1872-1875)
  • save (2153-2174)
  • save (2390-2396)
  • save (2569-2574)
  • save (2857-2874)
  • save (3494-3497)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Redirect rules
  • GitHub Check: Header rules
  • GitHub Check: Pages changed
  • GitHub Check: test
🔇 Additional comments (1)
.envs/.local/.django (1)

15-16: NATS_URL env wiring looks correct

The new NATS_URL=nats://nats:4222 entry is consistent with the other service URLs and appropriate for local Docker-based development; no issues from a config or secrets standpoint.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
ami/jobs/models.py (1)

366-377: Zero-image jobs in async mode still look “running/successful” but never complete cleanly

If images is empty and async_pipeline_workers is True:

  • queue_images_to_nats initializes Redis state and sets the "results" stage to SUCCESS with progress=1.0, then returns True.
  • MLJob.run treats this as success and returns without setting finished_at or explicitly marking the job as completed in a way that distinguishes “nothing to do” from “queued work”.
  • The legacy sync path (process_images) does finalize zero-image jobs (sets SUCCESS and finished_at).

This reproduces the earlier review concern: “empty” async jobs can appear stuck or oddly reported compared to v1 behavior.

A simple fix is to short‑circuit in MLJob.run before the async branch:

        source_image_count = len(images)
        job.progress.update_stage("collect", total_images=source_image_count)

+       if job.project.feature_flags.async_pipeline_workers and source_image_count == 0:
+           job.logger.info("No images to process; marking async ML job as SUCCESS")
+           job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0)
+           job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
+           job.update_status(JobState.SUCCESS, save=False)
+           job.finished_at = datetime.datetime.now()
+           job.save()
+           return
+
        if job.shuffle and source_image_count > 1:

This keeps async behavior aligned with the sync path and avoids “STARTED forever / SUCCESS with no work” edge cases.

Also applies to: 400-410

🧹 Nitpick comments (4)
ami/jobs/models.py (1)

412-528: MLJob.process_images extraction looks consistent; minor polish only

The new process_images classmethod mirrors the existing in-process behavior and correctly:

  • Respects request_source_image_batch_size for chunking.
  • Tracks per-batch processing and save subtasks.
  • Finalizes stages and job status (JobState.SUCCESS) and finished_at.

Two minor nits you might consider (non-blocking):

  • The bare # noqa on the chunks = ... line has no obvious need; if it’s not suppressing a specific rule anymore, drop it to avoid future RUF100 noise.
  • job.logger.warn is deprecated in favor of job.logger.warning (Line 386).

Otherwise this refactor looks good and keeps the v1 path behavior intact.

ami/ml/orchestration/jobs.py (2)

11-27: Surface cleanup result or at least log outcome from cleanup_nats_resources

cleanup_nats_resources currently discards the boolean returned by cleanup_job_resources and provides no logging, even though it’s intended for operational cleanup.

Consider either:

  • Returning the boolean so callers can act on failures, and/or
  • Adding a log line using job.logger or this module’s logger to record success/failure.

This will make diagnosing stuck NATS streams/consumers easier once this TODO is wired into job completion.


29-108: Queueing flow is solid; minor style and robustness tweaks suggested

The queue_images_to_nats implementation is generally good (prepares messages outside async, initializes Redis state, and logs success/failure). A few small points:

  • Line 70: for i, (image_pk, message) in enumerate(messages):i is unused. Rename to _ or drop enumerate to satisfy Ruff’s B007 and avoid confusion.
  • Lines 79–81: the broad except Exception as e: over the publish call is understandable at a network boundary, but if NATS exposes a narrower exception hierarchy you could catch those instead, or at least document why Exception is intentional.

Example tweak for the loop:

-        async with TaskQueueManager() as manager:
-            for i, (image_pk, message) in enumerate(messages):
+        async with TaskQueueManager() as manager:
+            for image_pk, message in messages:
                 try:
                     logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")

Overall behavior and return semantics (True only when all publishes succeed) look correct.

ami/jobs/tasks.py (1)

132-149: Improve exception logging around result processing and ACKs

The catch blocks around job lookup and the main processing path currently use logger.error with an exception message:

  • Line 129: except Exception as ack_error: logger.error(...)
  • Line 144: except Job.DoesNotExist: logger.error(...)
  • Line 147: except Exception as e: logger.error(...)

Given you immediately re-raise in each case, switching to logger.exception (or job.logger.exception where available) will include the traceback without extra code, making these failure paths much easier to debug. You can still keep the human-readable message.

Example:

-    except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
+    except Exception:
+        logger.exception(f"Failed to process pipeline result for job {job_id}")
         # Celery will automatically retry based on autoretry_for
         raise

Same idea applies to the ACK error and Job.DoesNotExist cases.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0a5c89e and 344f883.

📒 Files selected for processing (3)
  • ami/jobs/models.py (3 hunks)
  • ami/jobs/tasks.py (3 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ami/jobs/models.py (2)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (29-108)
ami/main/models.py (12)
  • SourceImage (1666-1914)
  • save (323-326)
  • save (904-917)
  • save (1139-1142)
  • save (1482-1489)
  • save (1581-1584)
  • save (1872-1875)
  • save (2153-2174)
  • save (2390-2396)
  • save (2569-2574)
  • save (2857-2874)
  • save (3494-3497)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (28-294)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-126)
  • cleanup (120-126)
  • initialize_job (38-50)
ami/jobs/tasks.py (5)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-126)
  • update_state (55-84)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py

70-70: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


79-79: Do not catch blind exception: Exception

(BLE001)

ami/jobs/tasks.py

128-128: Do not catch blind exception: Exception

(BLE001)


129-129: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


140-140: Abstract raise to an inner function

(TRY301)


144-144: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


147-147: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tasks.py (1)

201-223: log_time helper is fine; behaviour matches usage

The log_time utility correctly:

  • Uses time.perf_counter() for timing.
  • Returns both elapsed duration and a callable that can be reused for subsequent measurements.
  • Only logs when both msg is provided and start > 0, which fits the usage pattern in process_pipeline_result.

No changes needed here.

Comment on lines +400 to +410
if job.project.feature_flags.async_pipeline_workers:
queued = queue_images_to_nats(job, images)
if not queued:
job.logger.error("Aborting job %s because images could not be queued to NATS", job.pk)
job.progress.update_stage("collect", status=JobState.FAILURE)
job.update_status(JobState.FAILURE)
job.finished_at = datetime.datetime.now()
job.save()
return
else:
cls.process_images(job, images)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Async ML path leaves overall job lifecycle undefined (status/finished_at, Celery status mismatch)

When async_pipeline_workers is enabled, MLJob.run queues NATS tasks and returns immediately. The surrounding run_job Celery task then completes and update_job_status will set job.status to the Celery task status (SUCCESS), even though:

  • No images may yet have been processed by workers.
  • finished_at is never set for the async path.
  • The only subsequent updates come from process_pipeline_result, which adjust per-stage progress only (via _update_job_progress) and never touch job.status or finished_at.

Net effect: jobs can show SUCCESS while process/results stages are still <100%, and even once stages reach 100% there is no authoritative completion timestamp or explicit terminal status driven by the async pipeline.

Consider tightening this by:

  • Having the async pipeline be the source of truth for completion, e.g. in _update_job_progress when stage == "results" and progress_percentage >= 1.0, set job.status to JobState.SUCCESS, set finished_at, and (optionally) trigger NATS/Redis cleanup.
  • Optionally, for jobs with async_pipeline_workers=True, avoid overwriting job.status in update_job_status based solely on the run_job Celery task’s status, or treat that status as “queueing succeeded” only.

This will make the async path match the synchronous process_images semantics and avoid confusing “SUCCESS, 0% complete” states.

🤖 Prompt for AI Agents
In ami/jobs/models.py around lines 400-410, the async pipeline path only queues
work and returns, leaving job.status and finished_at unset and allowing the
Celery run_job task to mark the job SUCCESS prematurely; update the logic so
that for async_pipeline_workers you do not overwrite job.status based solely on
the run_job Celery task (treat that status as "queued" or leave unchanged), and
move authoritative completion handling into the async progress handler: in
_update_job_progress, when stage == "results" and progress_percentage >= 1.0 set
job.status = JobState.SUCCESS, set job.finished_at = now(), save the job (and
optionally perform NATS/Redis cleanup), and ensure any queued-path error
handling still sets FAILURE and finished_at as currently done.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
ami/ml/orchestration/nats_queue.py (1)

94-120: Detect and recreate consumer when TTR (ack_wait) changes.

The current code creates a consumer with ack_wait=TASK_TTR but never updates it if the consumer already exists with a different value. JetStream doesn't allow mutating ack_wait on existing durables, so if the consumer is created with one TTR and later code expects a different TTR, the old value persists. This causes premature timeouts and unintended redeliveries.

For example, if reserve_task() creates the consumer before any publish_task() calls, the consumer will use TASK_TTR=300. If future code changes expect a different TTR, the consumer retains the old value.

Check the existing consumer's ack_wait and recreate it when mismatched:

     async def _ensure_consumer(self, job_id: str):
         """Ensure consumer exists for the given job."""
         if self.js is None:
             raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.")
 
         stream_name = self._get_stream_name(job_id)
         consumer_name = self._get_consumer_name(job_id)
         subject = self._get_subject(job_id)
 
+        info = None
         try:
             info = await self.js.consumer_info(stream_name, consumer_name)
-            logger.debug(f"Consumer {consumer_name} already exists: {info}")
+            # Check if ack_wait matches the expected TTR
+            ack_wait_seconds = int(info.config.ack_wait.total_seconds())
+            if ack_wait_seconds != TASK_TTR:
+                logger.info(
+                    f"Consumer {consumer_name} has ack_wait={ack_wait_seconds}s, "
+                    f"expected {TASK_TTR}s. Recreating consumer."
+                )
+                await self.js.delete_consumer(stream_name, consumer_name)
+                info = None
+            else:
+                logger.debug(f"Consumer {consumer_name} already exists with correct ack_wait={ack_wait_seconds}s")
         except Exception:
-            # Consumer doesn't exist, create it
+            info = None
+
+        if info is None:
             await self.js.add_consumer(
                 stream=stream_name,
                 config=ConsumerConfig(
                     durable_name=consumer_name,
                     ack_policy=AckPolicy.EXPLICIT,
                     ack_wait=TASK_TTR,  # Visibility timeout (TTR)
                     max_deliver=5,  # Max retry attempts
                     deliver_policy=DeliverPolicy.ALL,
                     max_ack_pending=100,  # Max unacked messages
                     filter_subject=subject,
                 ),
             )
-            logger.info(f"Created consumer {consumer_name}")
+            logger.info(f"Created consumer {consumer_name} with ack_wait={TASK_TTR}s")

Based on learnings

🧹 Nitpick comments (3)
ami/ml/orchestration/task_state.py (1)

26-26: Annotate STAGES with ClassVar for clarity.

The mutable class attribute should be annotated with typing.ClassVar to make it explicit that this is a class-level constant shared across all instances.

Apply this diff:

+from typing import ClassVar
+
 class TaskStateManager:
     """
     Manages job progress tracking state in Redis.
 
     Tracks pending images for jobs to calculate progress percentages
     as workers process images asynchronously.
     """
 
     TIMEOUT = 86400 * 7  # 7 days in seconds
-    STAGES = ["process", "results"]
+    STAGES: ClassVar[list[str]] = ["process", "results"]
ami/ml/orchestration/jobs.py (2)

70-70: Remove unused loop variable.

The loop variable i is not used within the loop body.

Apply this diff:

-        for i, (image_pk, message) in enumerate(messages):
+        for image_pk, message in messages:

95-107: Consider clarifying the empty-images case flow.

When images is empty, the job progress is updated at lines 95-97, then the function continues through the result logging logic. While correct, the flow is subtle. Consider adding an early return after line 97 to make the empty-images case explicit:

     if not images:
         job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
         job.save()
+        job.logger.info(f"No images to queue for job '{job_id}'")
+        return True
 
     # Log results (back in sync context)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 344f883 and df7eaa3.

📒 Files selected for processing (3)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
  • update_stage (168-188)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
🪛 Ruff (0.14.5)
ami/ml/orchestration/nats_queue.py

76-76: Avoid specifying long messages outside the exception class

(TRY003)


84-84: Do not catch blind exception: Exception

(BLE001)


97-97: Avoid specifying long messages outside the exception class

(TRY003)


106-106: Do not catch blind exception: Exception

(BLE001)


134-134: Avoid specifying long messages outside the exception class

(TRY003)


148-148: Consider moving this statement to an else block

(TRY300)


150-150: Do not catch blind exception: Exception

(BLE001)


151-151: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


166-166: Avoid specifying long messages outside the exception class

(TRY003)


208-208: Do not catch blind exception: Exception

(BLE001)


209-209: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


223-223: Avoid specifying long messages outside the exception class

(TRY003)


228-228: Consider moving this statement to an else block

(TRY300)


229-229: Do not catch blind exception: Exception

(BLE001)


230-230: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


244-244: Avoid specifying long messages outside the exception class

(TRY003)


252-252: Consider moving this statement to an else block

(TRY300)


253-253: Do not catch blind exception: Exception

(BLE001)


254-254: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


268-268: Avoid specifying long messages outside the exception class

(TRY003)


275-275: Consider moving this statement to an else block

(TRY300)


276-276: Do not catch blind exception: Exception

(BLE001)


277-277: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

70-70: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

ami/ml/orchestration/task_state.py

26-26: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

160-166: Use timezone-aware datetime for finished_at.

Line 163 uses datetime.now() which returns a naive (non-timezone-aware) datetime. Django models typically expect timezone-aware datetimes, especially when USE_TZ = True. This can cause subtle bugs or database inconsistencies.

Apply this diff to use Django's timezone-aware datetime:

-from datetime import datetime
+from django.utils import timezone

And then:

         if stage == "results" and progress_percentage >= 1.0:
             job.status = JobState.SUCCESS
             job.progress.summary.status = JobState.SUCCESS
-            job.finished_at = datetime.now()
+            job.finished_at = timezone.now()
🧹 Nitpick comments (8)
ami/ml/orchestration/jobs.py (4)

11-26: Previous issue addressed; update docstring to document return value.

The function now correctly returns the cleanup result (addressing the previous review comment), but the docstring is missing the Returns: section.

Apply this diff to complete the docstring:

 def cleanup_nats_resources(job: "Job") -> bool:
     """
     Clean up NATS JetStream resources (stream and consumer) for a completed job.
 
     Args:
         job: The Job instance
+
+    Returns:
+        bool: True if cleanup successful, False otherwise
     """

72-72: Consider using job.logger for consistency.

This line uses the module-level logger, while other logging in this function uses job.logger. Using job.logger consistently ensures all logs are captured by the JobLogHandler.

-                    logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")
+                    job.logger.info(f"Queueing image {image_pk} to stream for job '{job_id}': {message}")

Note: You'll need to pass job into the async function or capture it from the outer scope (which should work since it's already in scope).


95-97: Consider NATS resource lifecycle for empty job.

When there are no images, the job progress is updated to complete, but should NATS stream/consumer resources be created at all, or immediately cleaned up if already created?

This could lead to orphaned NATS resources. Consider:

  1. Skipping NATS resource creation entirely when len(images) == 0, or
  2. Calling cleanup_nats_resources(job) immediately after marking the job complete.

103-106: Document partial failure behavior.

When some images fail to queue, the function returns False, but successfully queued images remain in the NATS stream. This could leave the job in an inconsistent state.

Consider documenting this behavior in the docstring, or implement one of these strategies:

  1. Return success if at least some images were queued, and let the job track partial completion
  2. Implement a rollback mechanism to remove successfully queued messages when any failure occurs
  3. Add retry logic for failed queue operations

The current behavior (returning False on any failure) might trigger job-level error handling that doesn't account for partially queued work.

ami/jobs/tasks.py (4)

126-128: Use logging.exception to capture traceback automatically.

When logging an exception in an except block, logging.exception is preferred over logging.error because it automatically includes the stack trace, making debugging easier.

Apply this diff:

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

141-147: Use logging.exception in exception handlers.

Similar to the NATS ACK handler, these exception handlers should use logging.exception instead of logging.error to automatically capture stack traces for easier debugging.

Apply this diff:

     except Job.DoesNotExist:
-        logger.error(f"Job {job_id} not found")
+        logger.exception(f"Job {job_id} not found")
         raise
     except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
+        logger.exception(f"Failed to process pipeline result for job {job_id}: {e}")
         # Celery will automatically retry based on autoretry_for
         raise

160-166: Cleanup of NATS queues and Redis state when job completes.

When the job reaches 100% completion (lines 160-163), there's no call to clean up the NATS JetStream queue or Redis state managed by TaskStateManager. This was mentioned in previous reviews and is listed as a TODO in the PR objectives.

Without cleanup:

  • NATS queues will accumulate over time
  • Redis keys persist beyond their usefulness
  • Resource leaks could degrade system performance

Would you like me to generate a cleanup implementation that could be called here? It would need to:

  1. Delete the NATS stream for this job
  2. Call TaskStateManager(job_id).cleanup() to remove Redis keys
  3. Include error handling to avoid failing job finalization if cleanup fails

80-86: Consider extracting duplicate retry logic (optional).

The pattern of calling update_state, checking for None, logging a warning, and retrying with the same parameters appears at both lines 80-86 and lines 131-138. You could extract this to a small helper function to reduce duplication.

Example helper:

def _update_state_with_retry(state_manager, processed_image_ids, stage, request_id, task):
    progress_info = state_manager.update_state(processed_image_ids, stage=stage, request_id=request_id)
    if not progress_info:
        logger.warning(
            f"Another task is already processing results for job {state_manager.job_id}. "
            f"Retrying task {request_id} in 5 seconds..."
        )
        raise task.retry(countdown=5, max_retries=10)
    return progress_info

Then use:

progress_info = _update_state_with_retry(state_manager, processed_image_ids, "process", self.request.id, self)

However, since this is only used twice and the logic is straightforward, the current approach is also acceptable.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df7eaa3 and 0391642.

📒 Files selected for processing (2)
  • ami/jobs/tasks.py (3 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tasks.py (5)
ami/jobs/views.py (2)
  • result (256-339)
  • retry (114-125)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (31-296)
  • acknowledge_task (212-231)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • retry (873-885)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
🪛 Ruff (0.14.5)
ami/jobs/tasks.py

126-126: Do not catch blind exception: Exception

(BLE001)


127-127: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


138-138: Abstract raise to an inner function

(TRY301)


142-142: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


145-145: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

70-70: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (1)
ami/jobs/tasks.py (1)

203-225: LGTM! Clean timing utility.

The log_time helper is well-designed with clear semantics. Using time.perf_counter() is the right choice for measuring elapsed time, and the partial function pattern makes it convenient to chain timing measurements. The docstring with usage examples is helpful.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
ami/ml/orchestration/jobs.py (1)

46-58: Use timezone-aware datetime for queue_timestamp.

Line 56 uses datetime.datetime.now() which returns a naive (timezone-unaware) datetime. Django applications should use timezone-aware timestamps to avoid comparison and storage issues.

Apply this diff to fix:

+from django.utils import timezone
+
 def queue_images_to_nats(job: "Job", images: list[SourceImage]):
     ...
         message = {
             "job_id": job.pk,
             "image_id": image_id,
             "image_url": image.url() if hasattr(image, "url") else None,
             "timestamp": (image.timestamp.isoformat() if hasattr(image, "timestamp") and image.timestamp else None),
             "batch_index": i,
             "total_images": len(images),
-            "queue_timestamp": datetime.datetime.now().isoformat(),
+            "queue_timestamp": timezone.now().isoformat(),
         }

You may also be able to remove the datetime import from line 1 if it's not used elsewhere.

🧹 Nitpick comments (2)
ami/ml/orchestration/jobs.py (2)

11-26: LGTM - cleanup function properly returns result.

The function correctly returns the boolean result from the async cleanup operation (line 26), which allows callers to handle success/failure. Good fix from the previous review.

Regarding the TODO comment at line 11: I can help implement the cleanup mechanism if needed. This could be triggered from job completion handlers or as part of a Celery task. Would you like me to open an issue to track this, or would you prefer a code suggestion?


88-102: LGTM - clean result handling.

The direct unpacking at line 88 is correct (previous None check issue was properly resolved). The logic clearly returns False for any failures and True for complete success.

Lines 90-92 handle the edge case of an empty images list by marking the job complete. While this seems intentional and reasonable, consider adding a brief comment explaining this behavior for future maintainers.

Optional: Add explanatory comment:

     successful_queues, failed_queues = async_to_sync(queue_all_images)()
 
     if not images:
+        # No images to process - mark job as complete immediately
         job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
         job.save()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0391642 and 4ae27b0.

📒 Files selected for processing (2)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • docker-compose.yml (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-compose.yml
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (5)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py

77-77: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (2)
ami/ml/orchestration/jobs.py (2)

60-63: LGTM - proper Redis state initialization.

Good pattern to initialize the task state manager with all image IDs before queuing begins. This ensures progress tracking is ready before any tasks are processed.


65-86: LGTM - batch processing with appropriate error handling.

The broad Exception catch at line 77 is flagged by static analysis, but it's actually appropriate in this context. The batch-processing pattern here requires continuing through all images even if individual ones fail, and the exception is logged (line 78) while tracking success/failure counts. This provides resilience for partial failures.

carlosgjs and others added 4 commits November 21, 2025 15:19
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
ami/jobs/tasks.py (1)

124-125: Consider using logging.exception for better debugging context.

At lines 125, 140, and 143, the code uses logging.error() within exception handlers. Using logging.exception() would automatically include the stack trace, making debugging easier without changing the log level.

Example for line 125:

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

Apply similar changes to lines 140 and 143.

Also applies to: 140-140, 143-143

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4ae27b0 and f221a1a.

📒 Files selected for processing (3)
  • ami/jobs/tasks.py (3 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • docker-compose.yml (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-compose.yml
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/orchestration/jobs.py (3)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (31-296)
  • cleanup_job_resources (280-296)
  • publish_task (122-152)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
ami/jobs/tasks.py (5)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (31-296)
  • acknowledge_task (212-231)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • update_stage (168-188)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
🪛 Ruff (0.14.5)
ami/ml/orchestration/jobs.py

76-76: Do not catch blind exception: Exception

(BLE001)

ami/jobs/tasks.py

124-124: Do not catch blind exception: Exception

(BLE001)


125-125: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


136-136: Abstract raise to an inner function

(TRY301)


140-140: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


143-143: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (4)
ami/ml/orchestration/jobs.py (1)

11-25: LGTM: Cleanup function correctly returns success status.

The async cleanup flow properly wraps the TaskQueueManager call and returns the boolean result, allowing callers to handle success/failure appropriately.

ami/jobs/tasks.py (3)

67-77: LGTM: Error handling correctly marks failed images as processed.

The code properly extracts the failed image ID and marks it as processed (lines 74-75), preventing the job from hanging indefinitely when images fail. This addresses the earlier review feedback.


111-127: LGTM: NATS acknowledgment flow is robust.

The acknowledgment logic correctly:

  • Acknowledges tasks even if processing errored (preventing infinite redelivery)
  • Handles ACK failures gracefully without failing the task (line 126 comment)
  • Uses proper async/sync bridging

The broad exception catch on line 124 is intentional and well-documented.


201-223: LGTM: Useful timing utility.

The log_time helper provides a clean pattern for measuring and logging execution times with partial function application. Well-documented with clear usage examples.

Comment on lines +148 to +163
def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> None:
from ami.jobs.models import Job, JobState # avoid circular import

with transaction.atomic():
job = Job.objects.select_for_update().get(pk=job_id)
job.progress.update_stage(
stage,
status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
progress=progress_percentage,
)
if stage == "results" and progress_percentage >= 1.0:
job.status = JobState.SUCCESS
job.progress.summary.status = JobState.SUCCESS
job.finished_at = datetime.now()
job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
job.save()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use timezone-aware datetime for job completion timestamp.

Line 161 uses datetime.now() which returns a naive datetime. Django applications should use timezone-aware timestamps to avoid timezone-related bugs and inconsistencies.

Apply this diff:

 def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> None:
     from ami.jobs.models import Job, JobState  # avoid circular import
+    from django.utils import timezone

     with transaction.atomic():
         job = Job.objects.select_for_update().get(pk=job_id)
         job.progress.update_stage(
             stage,
             status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
             progress=progress_percentage,
         )
         if stage == "results" and progress_percentage >= 1.0:
             job.status = JobState.SUCCESS
             job.progress.summary.status = JobState.SUCCESS
-            job.finished_at = datetime.now()
+            job.finished_at = timezone.now()
         job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
         job.save()
🤖 Prompt for AI Agents
In ami/jobs/tasks.py around lines 148-163, replace the naive datetime.now() used
to set job.finished_at with a timezone-aware timestamp by using
django.utils.timezone.now(); add an import for timezone (either at the module
top: from django.utils import timezone, or inside the function alongside the
existing local import) and set job.finished_at = timezone.now().

Comment on lines +89 to +92
if not images:
job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
job.save()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider updating both process and results stages for empty image batches.

When the images list is empty, only the "results" stage is marked as SUCCESS. The "process" stage remains uninitialized or incomplete, which could leave the job in an inconsistent state. While the Redis state would show 100% progress (empty list), the job's progress stages may not reflect this.

Consider updating both stages when there are no images to process:

     if not images:
+        job.progress.update_stage("process", status=JobState.SUCCESS, progress=1.0)
         job.progress.update_stage("results", status=JobState.SUCCESS, progress=1.0)
         job.save()
🤖 Prompt for AI Agents
In ami/ml/orchestration/jobs.py around lines 89 to 92, when images is empty only
the "results" stage is set to SUCCESS which leaves the "process" stage
uninitialized/incomplete; modify the empty-list branch to also update the
"process" stage to status=JobState.SUCCESS with progress=1.0 (and persist with
job.save()) so both stages reflect completed progress and the job state remains
consistent.

messages.append((image.pk, message))

# Store all image IDs in Redis for progress tracking
state_manager = TaskStateManager(job.pk)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanessavmac you may be interested in looking into the details of the TaskStateManager

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants