Skip to content

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Nov 14, 2025

Summary

This pull request introduces a new reprocess_all_images feature flag to control whether or not already-processed images are filtered during processing jobs (the default behavior), and propagates this flag throughout the image processing pipeline. This allows projects to configure whether jobs should reprocess existing images or ignore them, improving flexibility for development and production use.

Configuration and Tooling

  • Updated the flake8 linter version in .pre-commit-config.yaml from 6.0.0 to 6.1.0 because 6.0.0 fires false negatives inside f-strings. E.g.
./ami/jobs/models.py:136:61: E713 test for membership should be 'not in'

where "not in" is not code but a string message.

Tests:

With default reprocess_all_images=false:
Run time: 1:07
image

[2025-11-14 18:01:57] INFO Found 33 out of 100 images to process

Changed to true:

image Run time: `3:38` image
[2025-11-14 17:54:04] INFO Found 100 out of 100 images to process

[2025-11-14 17:57:25] INFO Found 1 existing detections to reprocess.

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Added a configurable feature flag to control whether projects reprocess all images or skip previously processed ones.
  • Chores

    • Updated flake8 pre-commit hook to version 6.1.0.
    • Updated RabbitMQ server configuration for improved message handling.

@netlify
Copy link

netlify bot commented Nov 14, 2025

Deploy Preview for antenna-preview ready!

Name Link
🔨 Latest commit cffef7e
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/691e7bf32d5d58000821b033
😎 Deploy Preview https://deploy-preview-1053--antenna-preview.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.
Lighthouse
Lighthouse
1 paths audited
Performance: 61 (🟢 up 23 from production)
Accessibility: 80 (no change from production)
Best Practices: 100 (no change from production)
SEO: 92 (no change from production)
PWA: 80 (no change from production)
View the detailed breakdown and full score reports

To edit notification comments on pull requests, go to your Netlify project configuration.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 14, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

The pull request introduces a new feature flag reprocess_all_images to the ProjectFeatureFlags model. This flag replaces hardcoded skip_processed parameters throughout the ML pipeline, job models, and task processing, enabling dynamic control over whether images should be reprocessed on a per-project basis.

Changes

Cohort / File(s) Summary
Configuration Updates
.pre-commit-config.yaml, .envs/.local/.django
Updated flake8 pre-commit hook to 6.1.0 and added RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS environment variable for consumer timeout configuration.
Feature Flag Addition
ami/main/models.py
Added new boolean public feature flag reprocess_all_images to ProjectFeatureFlags, defaulting to False.
Job Model Integration
ami/jobs/models.py
Replaced hardcoded skip_processed=True with dynamic reprocess_all_images flag sourced from job.project.feature_flags.reprocess_all_images in collect_images and process_images calls.
Pipeline Refactoring
ami/ml/models/pipeline.py
Replaced skip_processed parameter with reprocess_all_images in collect_images and process_images functions. Updated filtering logic: bypasses filtering when reprocess_all_images=True or pipeline is None. Updated Pipeline class methods to accept and propagate the new parameter.
Task Processing Updates
ami/ml/tasks.py
Added reprocess_all_images parameter to process_source_images_async; derives flag from job.project.feature_flags when job_id provided; passes flag to downstream process_images call.
View Test Updates
ami/ml/views.py
Updated test_process to pass reprocess_all_images argument from project.feature_flags to pipeline.process_images.

Sequence Diagram

sequenceDiagram
    participant Job as Job Model
    participant ProjFF as Project<br/>FeatureFlags
    participant AsyncTask as ML Task
    participant Pipeline as Pipeline
    participant Filter as Image Filter

    Job->>ProjFF: Read reprocess_all_images flag
    Note over ProjFF: Feature flag controls behavior
    
    Job->>AsyncTask: trigger processing<br/>(with job context)
    AsyncTask->>ProjFF: Fetch reprocess_all_images
    AsyncTask->>Pipeline: process_images<br/>(reprocess_all_images=flag)
    
    alt reprocess_all_images = False
        Pipeline->>Filter: filter_processed_images()
        Filter-->>Pipeline: Return unprocessed only
    else reprocess_all_images = True
        Pipeline-->>Pipeline: Process all images
        Note over Pipeline: Skip filtering step
    end
    
    Pipeline-->>AsyncTask: Processing complete
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Multiple function/method signature updates across the ML module with parameter substitution (skip_processedreprocess_all_images)
  • Logic changes in filtering behavior within pipeline.py that warrant verification of the conditional paths
  • Integration point between feature flags, job models, and pipeline execution
  • Particular attention needed to:
    • Verify the filtering logic inversion in collect_images and process_images
    • Confirm reprocess_existing_detections initialization logic in pipeline.py respects both the new flag and existing config behavior
    • Validate parameter propagation through all call sites (job model → task → pipeline)

Suggested reviewers

  • mihow

Poem

🐰 A flag hops into place, so fine,
Reprocess or skip—the choice is thine!
Through pipelines it bounces with glee,
Images flowing wild and free,
Feature flags dance, no more hardcoded lies!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main change: introducing a project-level feature flag for controlling image reprocessing behavior throughout the pipeline.
Description check ✅ Passed The PR description is comprehensive and follows most required template sections with summary, list of changes, detailed explanations, testing evidence, and a completed checklist.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs requested a review from mihow November 14, 2025 22:54
@carlosgjs carlosgjs marked this pull request as ready for review November 14, 2025 23:06
Copilot AI review requested due to automatic review settings November 14, 2025 23:06
Copilot finished reviewing on behalf of carlosgjs November 14, 2025 23:09
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request introduces a new skip_processed feature flag that allows projects to control whether already-processed images should be skipped during image processing jobs. The flag defaults to True to maintain backward compatibility and is propagated throughout the entire image processing pipeline from API endpoints through async tasks to the core processing logic. Additionally, the flake8 linter version is updated from 6.0.0 to 6.1.0 to fix false positive lint errors in f-strings.

Key Changes:

  • Added skip_processed boolean field to ProjectFeatureFlags model with default value True
  • Propagated the flag through all image processing layers: views → tasks → pipeline → process_images
  • Updated flake8 version to 6.1.0 in pre-commit configuration

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
ami/main/models.py Added skip_processed feature flag to ProjectFeatureFlags with default value True
ami/ml/tasks.py Updated async task to read and pass skip_processed flag from job's project feature flags
ami/ml/models/pipeline.py Added skip_processed parameter to process_images functions and integrated filtering logic
ami/ml/views.py Updated test_process endpoint to use skip_processed from project feature flags
ami/jobs/models.py Integrated skip_processed flag in job execution for both collect_images and process_images calls
.pre-commit-config.yaml Updated flake8 version from 6.0.0 to 6.1.0
Comments suppressed due to low confidence (1)

ami/ml/models/pipeline.py:220

  • The logic for reprocess_existing_detections has a potential issue. When skip_processed=False, line 215 sets reprocess_existing_detections=True, but this value can only be overridden to True (not False) in lines 217-220 if the feature flag is enabled. This means:
  1. If skip_processed=False and project.feature_flags.reprocess_existing_detections=False, detections will still be reprocessed (behavior controlled by skip_processed)
  2. If skip_processed=True and project.feature_flags.reprocess_existing_detections=True, detections will be reprocessed (behavior controlled by the separate feature flag)

This creates confusing precedence where these two flags interact in non-obvious ways. Consider either:

  • Checking both flags independently, or
  • Documenting the precedence clearly, or
  • Refactoring so reprocess_existing_detections feature flag can override in both directions
    reprocess_existing_detections = not skip_processed
    # Check if feature flag is enabled to reprocess existing detections
    if project and project.feature_flags.reprocess_existing_detections:
        # Check if the user wants to reprocess existing detections or ignore them
        if pipeline_config.get("reprocess_existing_detections", True):
            reprocess_existing_detections = True

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/ml/models/pipeline.py (1)

246-248: Add an explicit timeout to external POST

The create_session() function in ami/utils/requests.py configures only retry logic via HTTPAdapter and does not set default timeouts. The POST call at line 247 does not pass a timeout parameter, creating a potential hang risk. This aligns with the codebase pattern observed in processing_service.py, where timeouts are explicitly passed to requests calls (e.g., session.get(..., timeout=timeout)).

-    resp = session.post(endpoint_url, json=request_data.dict())
+    resp = session.post(endpoint_url, json=request_data.dict(), timeout=(5, 300))
🧹 Nitpick comments (4)
ami/ml/views.py (1)

129-137: Constrain test image to the pipeline’s project

Currently selects a random SourceImage across all projects. Narrow to the pipeline’s project to avoid cross‑project processing during test.

-        random_image = (
-            SourceImage.objects.all().order_by("?").first()
-        )  # TODO: Filter images by projects user has access to
+        random_image = (
+            SourceImage.objects.filter(project=project).order_by("?").first()
+        )
ami/ml/models/pipeline.py (3)

209-214: Avoid double calling public_url() and simplify zipping

Compute URLs once to prevent duplicate work and None checks.

-    task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}")
-    urls = [source_image.public_url() for source_image in images if source_image.public_url()]
+    task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}")
+    url_pairs = [(si, url) for si in images for url in [si.public_url()] if url]
 
-    source_image_requests: list[SourceImageRequest] = []
+    source_image_requests: list[SourceImageRequest] = []
     detection_requests: list[DetectionRequest] = []
@@
-    for source_image, url in zip(images, urls):
+    for source_image, url in url_pairs:
         if url:
             source_image_request = SourceImageRequest(
                 id=str(source_image.pk),
                 url=url,
             )
             source_image_requests.append(source_image_request)
 
             if reprocess_existing_detections:
                 detection_requests += collect_detections(source_image, source_image_request)

Also applies to: 222-233


89-93: Fix non‑formatted f‑string in debug message

The second line isn’t an f‑string, so {pipeline} won’t interpolate.

-            task_logger.debug(
-                f"Image {image} needs processing: has existing detections with no classifications "
-                "from pipeline {pipeline}"
-            )
+            task_logger.debug(
+                f"Image {image} needs processing: has existing detections with no classifications from pipeline {pipeline}"
+            )

101-106: Grammar nit in log text

“do yet” → “do not yet”.

-                    f"Since we do yet have a mechanism to reclassify detections, processing the image from scratch."
+                    f"Since we do not yet have a mechanism to reclassify detections, processing the image from scratch."
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1b461fb and 61d2485.

📒 Files selected for processing (6)
  • .pre-commit-config.yaml (1 hunks)
  • ami/jobs/models.py (2 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/models/pipeline.py (6 hunks)
  • ami/ml/tasks.py (2 hunks)
  • ami/ml/views.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ami/ml/views.py (1)
ami/ml/models/pipeline.py (2)
  • process_images (163-282)
  • process_images (1096-1117)
ami/ml/tasks.py (1)
ami/jobs/models.py (2)
  • Job (720-1005)
  • logger (990-999)
ami/ml/models/pipeline.py (2)
ami/main/models.py (1)
  • SourceImage (1664-1912)
ami/ml/schemas.py (1)
  • PipelineResultsResponse (199-214)
🔇 Additional comments (5)
.pre-commit-config.yaml (1)

43-43: flake8 bump to 6.1.0 looks good

No config drift spotted; args still point to setup.cfg.

ami/jobs/models.py (1)

372-378: Propagating skip_processed from project flags — LGTM

Both collect_images and process_images now respect per‑project configuration.

Also applies to: 418-423

ami/main/models.py (1)

219-220: New feature flag default is reasonable

Adding skip_processed=True at the schema level should backfill via defaults for existing rows stored in SchemaField.

Please confirm that legacy Project.feature_flags JSON lacking skip_processed deserializes with the default True in your environment.

ami/ml/models/pipeline.py (2)

192-201: Skip filtering logic reads well

Prefiltering when skip_processed=True is correct and keeps behavior clear.


1096-1117: Verification confirms backward compatibility is preserved

All 10+ call sites will automatically use the new skip_processed=True default, which matches the prior behavior. The parameter is correctly forwarded from the method to the underlying function, and no call sites need updating.

ami/ml/tasks.py Outdated
Comment on lines 18 to 25
skip_processed = True
try:
job = Job.objects.get(pk=job_id)
job.logger.info(f"Processing {len(image_ids)} images for job {job}")
skip_processed = job.project.feature_flags.skip_processed
job.logger.info(f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})")
except Job.DoesNotExist as e:
logger.error(f"Job {job_id} not found: {e}")
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid error log when job_id is None; guard Job lookup

If job_id is None, this logs an error “Job None not found”. Guard the lookup and downgrade the log to warning for missing jobs.

     job = None
-    skip_processed = True
-    try:
-        job = Job.objects.get(pk=job_id)
-        skip_processed = job.project.feature_flags.skip_processed
-        job.logger.info(f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})")
-    except Job.DoesNotExist as e:
-        logger.error(f"Job {job_id} not found: {e}")
-        pass
+    skip_processed = True
+    if job_id:
+        try:
+            job = Job.objects.get(pk=job_id)
+            skip_processed = job.project.feature_flags.skip_processed
+            job.logger.info(
+                f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})"
+            )
+        except Job.DoesNotExist as e:
+            logger.warning(f"Job {job_id} not found: {e}")
+    else:
+        logger.info(f"Processing {len(image_ids)} images (no job context, skip_processed={skip_processed})")
 
     images = SourceImage.objects.filter(pk__in=image_ids)

Also applies to: 30-36

🧰 Tools
🪛 Ruff (0.14.4)

24-24: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 18-25 (and similarly apply to lines 30-36), the
code attempts Job.objects.get(pk=job_id) even when job_id may be None which
produces misleading error logs like "Job None not found"; guard the lookup by
checking if job_id is truthy before calling Job.objects.get, and if job_id is
None or falsy log a logger.warning indicating a missing/invalid job_id and skip
further processing (return or continue as appropriate); also change the except
Job.DoesNotExist branch to use logger.warning instead of logger.error and ensure
control flow avoids subsequent use of a missing job variable.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
ami/ml/tasks.py (1)

18-28: Good fix for None check; consider logging improvements.

The guard if job_id is not None correctly addresses the past concern about logging "Job None not found." The logic flow is sound and defaults to skip_processed=True when the Job is missing.

Consider these logging refinements:

  • Use logger.exception instead of logger.error on line 25 to automatically capture the traceback.
  • Downgrade to logger.warning since execution continues normally with a safe default.
  • Remove the redundant pass on line 26.

Based on static analysis hints and past review comments.

Apply this diff to refine the logging:

     if job_id is not None:
         try:
             job = Job.objects.get(pk=job_id)
             skip_processed = job.project.feature_flags.skip_processed
             job.logger.info(f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})")
         except Job.DoesNotExist as e:
-            logger.error(f"Job {job_id} not found: {e}")
-            pass
+            logger.exception(f"Job {job_id} not found")
     else:
         logger.info(f"Processing {len(image_ids)} images for job_id=None (skip_processed={skip_processed})")

Alternatively, if you prefer warning level without the full traceback:

         except Job.DoesNotExist as e:
-            logger.error(f"Job {job_id} not found: {e}")
-            pass
+            logger.warning(f"Job {job_id} not found: {e}")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 61d2485 and 16cfd2d.

📒 Files selected for processing (1)
  • ami/ml/tasks.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/tasks.py (1)
ami/jobs/models.py (2)
  • Job (720-1005)
  • logger (990-999)
🪛 Ruff (0.14.5)
ami/ml/tasks.py

25-25: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: test
🔇 Additional comments (1)
ami/ml/tasks.py (1)

33-39: LGTM!

The skip_processed parameter is correctly propagated to process_images, enabling the downstream pipeline to conditionally skip already-processed images based on the feature flag.

Copy link
Collaborator

@mihow mihow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments from call: @carlosgjs will go ahead and rename (and flip) the skip_reprocessed parameter to reprocess_all_images everywhere in this PR.

@carlosgjs carlosgjs requested a review from mihow November 18, 2025 22:15
@mihow mihow requested a review from Copilot November 18, 2025 22:22
Copilot finished reviewing on behalf of mihow November 18, 2025 22:25
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

carlosgjs and others added 3 commits November 18, 2025 20:40
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4569f25 and d2711a7.

📒 Files selected for processing (2)
  • .envs/.local/.django (1 hunks)
  • ami/ml/tasks.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/tasks.py (1)
ami/jobs/models.py (2)
  • Job (720-1005)
  • logger (990-999)
🪛 Ruff (0.14.5)
ami/ml/tasks.py

27-27: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test

CELERY_RESULT_BACKEND=rpc:// # Use RabbitMQ for results backend
RABBITMQ_DEFAULT_USER=rabbituser
RABBITMQ_DEFAULT_PASS=rabbitpass
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit consumer_timeout 60480000
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

Verify the consumer timeout value is intentional.

The timeout is set to ~16.8 hours (60480000 ms). While this accommodates long-running reprocessing jobs, such an extended timeout in development could mask issues with stuck consumers or inefficient processing.

Consider documenting why this specific timeout value is needed, or verify whether a shorter timeout (e.g., 1-2 hours) would be sufficient for typical reprocessing workloads.


🏁 Script executed:

#!/bin/bash
# Check RabbitMQ documentation for recommended consumer_timeout values
echo "Verifying RabbitMQ consumer_timeout configuration..."
echo "Current value: 60480000 ms (16.8 hours)"
echo "Calculating hours: $((60480000 / 1000 / 60 / 60))"

Length of output: 114


🌐 Web query:

RabbitMQ consumer_timeout recommended value best practices

💡 Result:

Short answer

  • Default/recommended global value: 1,800,000 ms (30 minutes). [1][2]
  • Don’t set it under 60,000 ms (1 minute); values <5 minutes are discouraged. Enforcement is checked ~once per minute. [1]
  • Use per‑queue settings (policy or x-consumer-timeout) if only some queues need a different timeout. [1]

Best practices

  • Keep the default 30m for most workloads; increase to a few hours if your consumers legitimately take long to ack. [1][2]
  • Prefer raising the timeout per-queue (policy or queue arg) rather than disabling globally. [1]
  • Avoid disabling the timeout entirely unless you understand the risks (stuck consumers can consume resources and block requeues). If you must, prefer a very large value over turning it off. [1][4]
  • If you run a managed broker (CloudAMQP, Amazon MQ, etc.), check provider limits and defaults (some enforce 30m or allow “infinite”/0). [2][3]
  • Monitor for channels being closed with PRECONDITION_FAILED and tune based on observed consumer behaviour and processing patterns. [1][5]

References
[1] RabbitMQ docs — Delivery acknowledgement / consumer timeout (consumer_timeout, per-queue policy/arg).
[2] RabbitMQ docs — Configurable values / default consumer_timeout = 1800000 ms.
[3] Amazon MQ docs — consumer_timeout default and allowed range.
[4] CloudAMQP docs — consumer timeout explanation and range.
[5] MailerQ blog — example recommending disabling for long-running consumers (illustrates when you might need higher/disabled value).


🏁 Script executed:

#!/bin/bash
# Search for documentation or comments about RabbitMQ timeout configuration
rg -i "consumer_timeout|rabbitmq.*timeout|reprocessing" -C 3 --type=txt --type=md

Length of output: 1367


🏁 Script executed:

#!/bin/bash
# Check the .envs/.local/.django file for context and any surrounding comments
cat .envs/.local/.django | head -40

Length of output: 1241


Add documentation for the RabbitMQ consumer timeout or reconsider the value.

The 60480000 ms (16.8 hours) timeout significantly exceeds RabbitMQ's default recommended value of 1,800,000 ms (30 minutes), and best practices recommend keeping the default 30 minutes for most workloads and increasing to "a few hours" only if consumers legitimately require longer to acknowledge messages. While your codebase does include reprocessing pipelines that may justify an extended timeout, no documentation currently explains why this specific value is needed. RabbitMQ guidance cautions against overly long timeouts as they can mask stuck consumers—a valid concern for development environments.

Add a comment explaining the 16.8-hour requirement, or verify whether a shorter timeout (e.g., 1–2 hours, aligned with RabbitMQ recommendations) would suffice for your reprocessing workloads. Additionally, consider using per-queue settings (policy or queue arguments) instead of a global server-level timeout to apply the extended timeout only to queues that need it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I set a default for rabbit consumer_timeout as a django setting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find Django settings files and check for Celery/RabbitMQ configuration
echo "=== Django settings files ==="
fd -e py settings

echo -e "\n=== Celery configuration in Django settings ==="
rg -n "CELERY_|celery" --type=py -g "settings*.py" -C 2

echo -e "\n=== RabbitMQ configuration references ==="
rg -n "RABBITMQ_|rabbitmq|consumer_timeout" --type=py -C 2

echo -e "\n=== Docker compose files that might show RabbitMQ container config ==="
fd -e yml -e yaml docker-compose

Length of output: 635


🏁 Script executed:

#!/bin/bash
# Find Django settings directory
echo "=== Finding Django config/settings ==="
fd settings.py

echo -e "\n=== Check docker-compose.yml for RabbitMQ configuration ==="
cat docker-compose.yml | grep -A 20 -B 5 "rabbitmq"

echo -e "\n=== Check for Celery configuration files ==="
fd -e py celery

echo -e "\n=== Search for any RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS usage ==="
rg "RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" -A 2 -B 2

Length of output: 1682


🏁 Script executed:

#!/bin/bash
# Check Celery configuration
echo "=== Celery app configuration ==="
cat config/celery_app.py

echo -e "\n=== Check if there are Django settings directories ==="
fd -t d settings

echo -e "\n=== Look for any timeout-related Celery settings ==="
rg -i "timeout" config/celery_app.py

Length of output: 782


🏁 Script executed:

#!/bin/bash
# Check Django settings for any existing Celery configuration
echo "=== Django settings files ==="
cat config/settings/base.py | grep -A 5 -B 2 "CELERY" || echo "No CELERY settings in base.py"

echo -e "\n=== Check local settings ==="
cat config/settings/local.py | grep -A 5 -B 2 "CELERY" || echo "No CELERY settings in local.py"

echo -e "\n=== List all settings files ==="
ls -la config/settings/

echo -e "\n=== Search for any Celery timeout configurations ==="
rg "CELERY.*TIMEOUT|task_time_limit|task_soft_time_limit" config/settings/

Length of output: 4963


@mihow No, you cannot set RabbitMQ's consumer_timeout as a Django setting because it's a server-side RabbitMQ configuration, not a client-side setting.

Here's the key distinction:

Server-side (RabbitMQ):

  • consumer_timeout is an Erlang VM argument that configures the RabbitMQ server itself
  • Must be set via RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS environment variable (as you're doing now) or in rabbitmq.conf
  • Controls how long the RabbitMQ server waits before disconnecting inactive consumers

Client-side (Django/Celery):

  • Django settings (including CELERY_* settings) only control how your Django application and Celery workers connect to and interact with RabbitMQ
  • I can see you already have CELERY_TASK_TIME_LIMIT = 4 * 60 * 60 * 24 (4 days) in config/settings/base.py, which controls how long tasks can run before being terminated by Celery

Alternative approaches:

  1. Keep the environment variable approach (current - standard for Docker deployments)
  2. Use RabbitMQ policies to set x-consumer-timeout per queue if you only need extended timeouts for specific queues
  3. Create a rabbitmq.conf file mounted into the container

The environment variable approach you're using is the standard way to configure RabbitMQ server settings in containerized deployments.

Comment on lines +18 to +32
reprocess_all_images = False
if job_id is not None:
try:
job = Job.objects.get(pk=job_id)
reprocess_all_images = job.project.feature_flags.reprocess_all_images
job.logger.info(
f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})"
)
except Job.DoesNotExist as e:
logger.error(f"Job {job_id} not found: {e}")

else:
logger.info(
f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})"
)
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Set job_id = None when Job doesn't exist to prevent downstream errors.

When Job.DoesNotExist is caught (line 26), the code continues with the original (invalid) job_id value, which is then passed to process_images() (line 41) and save_results() (line 46). While you mentioned that "process_images handles job_id=None", the code doesn't actually set job_id = None in the exception handler.

Additionally, use logger.exception() instead of logger.error() to capture the full traceback. Based on learnings (past review comments and static analysis).

Apply this diff:

     job = None
     reprocess_all_images = False
     if job_id is not None:
         try:
             job = Job.objects.get(pk=job_id)
             reprocess_all_images = job.project.feature_flags.reprocess_all_images
             job.logger.info(
                 f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})"
             )
         except Job.DoesNotExist as e:
-            logger.error(f"Job {job_id} not found: {e}")
+            logger.exception(f"Job {job_id} not found: {e}")
+            job_id = None
 
     else:
         logger.info(
             f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})"
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
reprocess_all_images = False
if job_id is not None:
try:
job = Job.objects.get(pk=job_id)
reprocess_all_images = job.project.feature_flags.reprocess_all_images
job.logger.info(
f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})"
)
except Job.DoesNotExist as e:
logger.error(f"Job {job_id} not found: {e}")
else:
logger.info(
f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})"
)
reprocess_all_images = False
if job_id is not None:
try:
job = Job.objects.get(pk=job_id)
reprocess_all_images = job.project.feature_flags.reprocess_all_images
job.logger.info(
f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})"
)
except Job.DoesNotExist as e:
logger.exception(f"Job {job_id} not found: {e}")
job_id = None
else:
logger.info(
f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})"
)
🧰 Tools
🪛 Ruff (0.14.5)

27-27: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
ami/ml/tasks.py around lines 18 to 32: when Job.DoesNotExist is caught the code
logs the error but leaves job_id unchanged which can cause downstream errors;
update the exception handler to set job_id = None (so downstream calls receive
None) and replace logger.error(...) with logger.exception(...) to record the
full traceback; keep the existing log message context and ensure any local `job`
variable isn’t referenced after the exception.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai make a new ticket about this issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Create a GitHub issue for the job_id handling problem

gh issue create \
  --repo RolnickLab/antenna \
  --title "Handle Job.DoesNotExist exception properly in process_source_images_async" \
  --body "## Problem Description

In \`ami/ml/tasks.py\`, the \`process_source_images_async\` function catches \`Job.DoesNotExist\` exceptions but doesn't reset \`job_id\` to \`None\`. This means the invalid \`job_id\` is passed to downstream functions (\`process_images()\` and \`save_results()\`), which could cause errors.

Additionally, the exception handler uses \`logger.error()\` instead of \`logger.exception()\`, which means the full traceback is not captured for debugging.

## Context

- **File:** \`ami/ml/tasks.py\`
- **Function:** \`process_source_images_async\`
- **Related PR:** #1053
- **Review Comment:** https://github.com/RolnickLab/antenna/pull/1053#discussion_r2540537287

## Expected Behavior

When a \`Job.DoesNotExist\` exception occurs:
1. The \`job_id\` variable should be set to \`None\` to prevent passing an invalid ID downstream
2. The full exception traceback should be logged using \`logger.exception()\` for debugging

---

Requested by: @mihow" \
  --assignee mihow

Length of output: 49


@mihow, I've created issue #1061 to track this problem: #1061

The issue has been assigned to you and includes the problem description along with links back to this PR and comment.


🧬 Code Graph Analysis Results

[
{
"filename": "ami/ml/media.py",
"start_line": 78,
"end_line": 94,
"snippet_type": "full",
"content": "def create_detection_images_from_source_image(source_image: SourceImage) -> list[str]:\n # Check if the source image has detections without images before loading the image\n if not source_image.detections.filter(path__isnull=True).exists():\n return []\n\n image_np = load_source_image(source_image)\n processed_paths = []\n\n for detection in source_image.detections.filter(path__isnull=True):\n if detection.bbox:\n cropped_image = crop_detection(image_np, detection.bbox)\n path = save_crop(cropped_image, detection, source_image)\n detection.path = path\n detection.save()\n processed_paths.append(path)\n\n return processed_paths"
},
{
"filename": "ami/ml/models/pipeline.py",
"start_line": 163,
"end_line": 211,
"snippet_type": "summary",
"content": ""def process_images(pipeline, endpoint_url, images, job_id=None, project_id=None, reprocess_all_images=False) -> PipelineResultsResponse":\n\n Process images by sending a request to the ML backend for a given pipeline.\n\n Parameters:\n - pipeline: Pipeline instance describing stages and config.\n - endpoint_url: Backend endpoint to which the request is posted.\n - images: Iterable of SourceImage instances to process.\n - job_id: Optional Job ID for logger context; if provided, uses the job's logger.\n - project_id: Optional project ID to derive pipeline config overrides.\n - reprocess_all_images: If True, forces reprocessing of all images; otherwise filters already processed ones.\n\n Returns:\n - PipelineResultsResponse containing processed images, detections, and metadata.\n\n Exceptions/Errors:\n - Logs errors and returns a results object with errors if the backend call fails.\n\n Notes:\n - Builds a request payload including source images, config, and any pre-detected detections.\n - Uses the provided endpoint_url and handles logging via either the Job logger or a module logger."
},
{
"filename": "ami/ml/models/pipeline.py",
"start_line": 814,
"end_line": 922,
"snippet_type": "summary",
"content": ""def save_results(results: PipelineResultsResponse | None = None, results_json: str | None = None, job_id: int | None = None, return_created=False) -> PipelineSaveResults | None":\n\n Save results returned by the ML pipeline backend into the database.\n\n Parameters:\n - results: PipelineResultsResponse object parsed in memory.\n - results_json: Raw JSON string representing the results (alternative to results).\n - job_id: Optional job context for logging and associations.\n - return_created: If True, return a PipelineSaveResults object describing created entities.\n\n Returns:\n - PipelineSaveResults when return_created is True; otherwise None.\n\n Behavior:\n - Creates/associates Pipeline, SourceImage, Detections, Classifications, and Occurrences.\n - Spawns sub-tasks to create detection images for detected sources.\n - Updates calculated fields on affected SourceImage and Events.\n - Logs timing and outcomes."
},
{
"filename": "ami/main/models.py",
"start_line": 2232,
"end_line": 2243,
"snippet_type": "full",
"content": "def find_duplicates(self, project_id: int | None = None) -> models.QuerySet:\n # Find the oldest classification for each unique combination\n if project_id:\n self = self.filter(detection__source_image__project_id=project_id)\n unique_oldest = (\n self.values("detection", "taxon", "algorithm", "score", "softmax_output", "raw_output")\n .annotate(min_id=models.Min("id"))\n .distinct()\n )\n\n # Keep only the oldest classifications\n return self.exclude(id__in=[item["min_id"] for item in unique_oldest])"
},
{
"filename": "ami/ml/models/processing_service.py",
"start_line": 36,
"end_line": 222,
"snippet_type": "summary",
"content": ""class ProcessingService(BaseModel):" ... "def get_status(self, timeout=90) -> ProcessingServiceStatusResponse":\n\n Health check for a processing service.\n\n - Pings the /readyz endpoint with a session that includes retry/backoff logic.\n - Updates last_checked, last_checked_live, and last_checked_latency fields on the service.\n - Parses pipeline statuses from the response if available.\n\n Returns a ProcessingServiceStatusResponse containing timestamp, request_successful, server_live,\n pipelines_online, pipeline_configs, endpoint_url, error, latency."
},
{
"filename": "ami/jobs/models.py",
"start_line": 990,
"end_line": 999,
"snippet_type": "full",
"content": " def logger(self) -> logging.Logger:\n _logger = logging.getLogger(f"ami.jobs.{self.pk}")\n\n # Only add JobLogHandler if not already present\n if not any(isinstance(h, JobLogHandler) for h in _logger.handlers):\n # Also log output to a field on thie model instance\n logger.info("Adding JobLogHandler to logger for job %s", self.pk)\n _logger.addHandler(JobLogHandler(self))\n _logger.propagate = False\n return _logger"
}
]

@mihow
Copy link
Collaborator

mihow commented Nov 20, 2025

@carlosgjs I made a dedicated PR for the consumer timeout change

@mihow mihow merged commit 83ca9e7 into RolnickLab:main Nov 20, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants