-
Notifications
You must be signed in to change notification settings - Fork 11
feat: Project feature flag for reprocessing all images #1053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
✅ Deploy Preview for antenna-preview ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThe pull request introduces a new feature flag Changes
Sequence DiagramsequenceDiagram
participant Job as Job Model
participant ProjFF as Project<br/>FeatureFlags
participant AsyncTask as ML Task
participant Pipeline as Pipeline
participant Filter as Image Filter
Job->>ProjFF: Read reprocess_all_images flag
Note over ProjFF: Feature flag controls behavior
Job->>AsyncTask: trigger processing<br/>(with job context)
AsyncTask->>ProjFF: Fetch reprocess_all_images
AsyncTask->>Pipeline: process_images<br/>(reprocess_all_images=flag)
alt reprocess_all_images = False
Pipeline->>Filter: filter_processed_images()
Filter-->>Pipeline: Return unprocessed only
else reprocess_all_images = True
Pipeline-->>Pipeline: Process all images
Note over Pipeline: Skip filtering step
end
Pipeline-->>AsyncTask: Processing complete
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces a new skip_processed feature flag that allows projects to control whether already-processed images should be skipped during image processing jobs. The flag defaults to True to maintain backward compatibility and is propagated throughout the entire image processing pipeline from API endpoints through async tasks to the core processing logic. Additionally, the flake8 linter version is updated from 6.0.0 to 6.1.0 to fix false positive lint errors in f-strings.
Key Changes:
- Added
skip_processedboolean field toProjectFeatureFlagsmodel with default valueTrue - Propagated the flag through all image processing layers: views → tasks → pipeline → process_images
- Updated flake8 version to 6.1.0 in pre-commit configuration
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| ami/main/models.py | Added skip_processed feature flag to ProjectFeatureFlags with default value True |
| ami/ml/tasks.py | Updated async task to read and pass skip_processed flag from job's project feature flags |
| ami/ml/models/pipeline.py | Added skip_processed parameter to process_images functions and integrated filtering logic |
| ami/ml/views.py | Updated test_process endpoint to use skip_processed from project feature flags |
| ami/jobs/models.py | Integrated skip_processed flag in job execution for both collect_images and process_images calls |
| .pre-commit-config.yaml | Updated flake8 version from 6.0.0 to 6.1.0 |
Comments suppressed due to low confidence (1)
ami/ml/models/pipeline.py:220
- The logic for
reprocess_existing_detectionshas a potential issue. Whenskip_processed=False, line 215 setsreprocess_existing_detections=True, but this value can only be overridden toTrue(notFalse) in lines 217-220 if the feature flag is enabled. This means:
- If
skip_processed=Falseandproject.feature_flags.reprocess_existing_detections=False, detections will still be reprocessed (behavior controlled byskip_processed) - If
skip_processed=Trueandproject.feature_flags.reprocess_existing_detections=True, detections will be reprocessed (behavior controlled by the separate feature flag)
This creates confusing precedence where these two flags interact in non-obvious ways. Consider either:
- Checking both flags independently, or
- Documenting the precedence clearly, or
- Refactoring so
reprocess_existing_detectionsfeature flag can override in both directions
reprocess_existing_detections = not skip_processed
# Check if feature flag is enabled to reprocess existing detections
if project and project.feature_flags.reprocess_existing_detections:
# Check if the user wants to reprocess existing detections or ignore them
if pipeline_config.get("reprocess_existing_detections", True):
reprocess_existing_detections = True
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/ml/models/pipeline.py (1)
246-248: Add an explicit timeout to external POSTThe
create_session()function inami/utils/requests.pyconfigures only retry logic via HTTPAdapter and does not set default timeouts. The POST call at line 247 does not pass a timeout parameter, creating a potential hang risk. This aligns with the codebase pattern observed inprocessing_service.py, where timeouts are explicitly passed to requests calls (e.g.,session.get(..., timeout=timeout)).- resp = session.post(endpoint_url, json=request_data.dict()) + resp = session.post(endpoint_url, json=request_data.dict(), timeout=(5, 300))
🧹 Nitpick comments (4)
ami/ml/views.py (1)
129-137: Constrain test image to the pipeline’s projectCurrently selects a random SourceImage across all projects. Narrow to the pipeline’s project to avoid cross‑project processing during test.
- random_image = ( - SourceImage.objects.all().order_by("?").first() - ) # TODO: Filter images by projects user has access to + random_image = ( + SourceImage.objects.filter(project=project).order_by("?").first() + )ami/ml/models/pipeline.py (3)
209-214: Avoid double calling public_url() and simplify zippingCompute URLs once to prevent duplicate work and None checks.
- task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}") - urls = [source_image.public_url() for source_image in images if source_image.public_url()] + task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}") + url_pairs = [(si, url) for si in images for url in [si.public_url()] if url] - source_image_requests: list[SourceImageRequest] = [] + source_image_requests: list[SourceImageRequest] = [] detection_requests: list[DetectionRequest] = [] @@ - for source_image, url in zip(images, urls): + for source_image, url in url_pairs: if url: source_image_request = SourceImageRequest( id=str(source_image.pk), url=url, ) source_image_requests.append(source_image_request) if reprocess_existing_detections: detection_requests += collect_detections(source_image, source_image_request)Also applies to: 222-233
89-93: Fix non‑formatted f‑string in debug messageThe second line isn’t an f‑string, so {pipeline} won’t interpolate.
- task_logger.debug( - f"Image {image} needs processing: has existing detections with no classifications " - "from pipeline {pipeline}" - ) + task_logger.debug( + f"Image {image} needs processing: has existing detections with no classifications from pipeline {pipeline}" + )
101-106: Grammar nit in log text“do yet” → “do not yet”.
- f"Since we do yet have a mechanism to reclassify detections, processing the image from scratch." + f"Since we do not yet have a mechanism to reclassify detections, processing the image from scratch."
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
.pre-commit-config.yaml(1 hunks)ami/jobs/models.py(2 hunks)ami/main/models.py(1 hunks)ami/ml/models/pipeline.py(6 hunks)ami/ml/tasks.py(2 hunks)ami/ml/views.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ami/ml/views.py (1)
ami/ml/models/pipeline.py (2)
process_images(163-282)process_images(1096-1117)
ami/ml/tasks.py (1)
ami/jobs/models.py (2)
Job(720-1005)logger(990-999)
ami/ml/models/pipeline.py (2)
ami/main/models.py (1)
SourceImage(1664-1912)ami/ml/schemas.py (1)
PipelineResultsResponse(199-214)
🔇 Additional comments (5)
.pre-commit-config.yaml (1)
43-43: flake8 bump to 6.1.0 looks goodNo config drift spotted; args still point to setup.cfg.
ami/jobs/models.py (1)
372-378: Propagating skip_processed from project flags — LGTMBoth collect_images and process_images now respect per‑project configuration.
Also applies to: 418-423
ami/main/models.py (1)
219-220: New feature flag default is reasonableAdding skip_processed=True at the schema level should backfill via defaults for existing rows stored in SchemaField.
Please confirm that legacy Project.feature_flags JSON lacking skip_processed deserializes with the default True in your environment.
ami/ml/models/pipeline.py (2)
192-201: Skip filtering logic reads wellPrefiltering when skip_processed=True is correct and keeps behavior clear.
1096-1117: Verification confirms backward compatibility is preservedAll 10+ call sites will automatically use the new
skip_processed=Truedefault, which matches the prior behavior. The parameter is correctly forwarded from the method to the underlying function, and no call sites need updating.
ami/ml/tasks.py
Outdated
| skip_processed = True | ||
| try: | ||
| job = Job.objects.get(pk=job_id) | ||
| job.logger.info(f"Processing {len(image_ids)} images for job {job}") | ||
| skip_processed = job.project.feature_flags.skip_processed | ||
| job.logger.info(f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})") | ||
| except Job.DoesNotExist as e: | ||
| logger.error(f"Job {job_id} not found: {e}") | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid error log when job_id is None; guard Job lookup
If job_id is None, this logs an error “Job None not found”. Guard the lookup and downgrade the log to warning for missing jobs.
job = None
- skip_processed = True
- try:
- job = Job.objects.get(pk=job_id)
- skip_processed = job.project.feature_flags.skip_processed
- job.logger.info(f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})")
- except Job.DoesNotExist as e:
- logger.error(f"Job {job_id} not found: {e}")
- pass
+ skip_processed = True
+ if job_id:
+ try:
+ job = Job.objects.get(pk=job_id)
+ skip_processed = job.project.feature_flags.skip_processed
+ job.logger.info(
+ f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})"
+ )
+ except Job.DoesNotExist as e:
+ logger.warning(f"Job {job_id} not found: {e}")
+ else:
+ logger.info(f"Processing {len(image_ids)} images (no job context, skip_processed={skip_processed})")
images = SourceImage.objects.filter(pk__in=image_ids)Also applies to: 30-36
🧰 Tools
🪛 Ruff (0.14.4)
24-24: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In ami/ml/tasks.py around lines 18-25 (and similarly apply to lines 30-36), the
code attempts Job.objects.get(pk=job_id) even when job_id may be None which
produces misleading error logs like "Job None not found"; guard the lookup by
checking if job_id is truthy before calling Job.objects.get, and if job_id is
None or falsy log a logger.warning indicating a missing/invalid job_id and skip
further processing (return or continue as appropriate); also change the except
Job.DoesNotExist branch to use logger.warning instead of logger.error and ensure
control flow avoids subsequent use of a missing job variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
ami/ml/tasks.py (1)
18-28: Good fix for None check; consider logging improvements.The guard
if job_id is not Nonecorrectly addresses the past concern about logging "Job None not found." The logic flow is sound and defaults toskip_processed=Truewhen the Job is missing.Consider these logging refinements:
- Use
logger.exceptioninstead oflogger.erroron line 25 to automatically capture the traceback.- Downgrade to
logger.warningsince execution continues normally with a safe default.- Remove the redundant
passon line 26.Based on static analysis hints and past review comments.
Apply this diff to refine the logging:
if job_id is not None: try: job = Job.objects.get(pk=job_id) skip_processed = job.project.feature_flags.skip_processed job.logger.info(f"Processing {len(image_ids)} images for job {job} (skip_processed={skip_processed})") except Job.DoesNotExist as e: - logger.error(f"Job {job_id} not found: {e}") - pass + logger.exception(f"Job {job_id} not found") else: logger.info(f"Processing {len(image_ids)} images for job_id=None (skip_processed={skip_processed})")Alternatively, if you prefer warning level without the full traceback:
except Job.DoesNotExist as e: - logger.error(f"Job {job_id} not found: {e}") - pass + logger.warning(f"Job {job_id} not found: {e}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ami/ml/tasks.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/tasks.py (1)
ami/jobs/models.py (2)
Job(720-1005)logger(990-999)
🪛 Ruff (0.14.5)
ami/ml/tasks.py
25-25: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: lint
- GitHub Check: test
🔇 Additional comments (1)
ami/ml/tasks.py (1)
33-39: LGTM!The
skip_processedparameter is correctly propagated toprocess_images, enabling the downstream pipeline to conditionally skip already-processed images based on the feature flag.
mihow
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments from call: @carlosgjs will go ahead and rename (and flip) the skip_reprocessed parameter to reprocess_all_images everywhere in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
.envs/.local/.django(1 hunks)ami/ml/tasks.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/tasks.py (1)
ami/jobs/models.py (2)
Job(720-1005)logger(990-999)
🪛 Ruff (0.14.5)
ami/ml/tasks.py
27-27: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
.envs/.local/.django
Outdated
| CELERY_RESULT_BACKEND=rpc:// # Use RabbitMQ for results backend | ||
| RABBITMQ_DEFAULT_USER=rabbituser | ||
| RABBITMQ_DEFAULT_PASS=rabbitpass | ||
| RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit consumer_timeout 60480000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify the consumer timeout value is intentional.
The timeout is set to ~16.8 hours (60480000 ms). While this accommodates long-running reprocessing jobs, such an extended timeout in development could mask issues with stuck consumers or inefficient processing.
Consider documenting why this specific timeout value is needed, or verify whether a shorter timeout (e.g., 1-2 hours) would be sufficient for typical reprocessing workloads.
🏁 Script executed:
#!/bin/bash
# Check RabbitMQ documentation for recommended consumer_timeout values
echo "Verifying RabbitMQ consumer_timeout configuration..."
echo "Current value: 60480000 ms (16.8 hours)"
echo "Calculating hours: $((60480000 / 1000 / 60 / 60))"Length of output: 114
🌐 Web query:
RabbitMQ consumer_timeout recommended value best practices
💡 Result:
Short answer
- Default/recommended global value: 1,800,000 ms (30 minutes). [1][2]
- Don’t set it under 60,000 ms (1 minute); values <5 minutes are discouraged. Enforcement is checked ~once per minute. [1]
- Use per‑queue settings (policy or x-consumer-timeout) if only some queues need a different timeout. [1]
Best practices
- Keep the default 30m for most workloads; increase to a few hours if your consumers legitimately take long to ack. [1][2]
- Prefer raising the timeout per-queue (policy or queue arg) rather than disabling globally. [1]
- Avoid disabling the timeout entirely unless you understand the risks (stuck consumers can consume resources and block requeues). If you must, prefer a very large value over turning it off. [1][4]
- If you run a managed broker (CloudAMQP, Amazon MQ, etc.), check provider limits and defaults (some enforce 30m or allow “infinite”/0). [2][3]
- Monitor for channels being closed with PRECONDITION_FAILED and tune based on observed consumer behaviour and processing patterns. [1][5]
References
[1] RabbitMQ docs — Delivery acknowledgement / consumer timeout (consumer_timeout, per-queue policy/arg).
[2] RabbitMQ docs — Configurable values / default consumer_timeout = 1800000 ms.
[3] Amazon MQ docs — consumer_timeout default and allowed range.
[4] CloudAMQP docs — consumer timeout explanation and range.
[5] MailerQ blog — example recommending disabling for long-running consumers (illustrates when you might need higher/disabled value).
🏁 Script executed:
#!/bin/bash
# Search for documentation or comments about RabbitMQ timeout configuration
rg -i "consumer_timeout|rabbitmq.*timeout|reprocessing" -C 3 --type=txt --type=mdLength of output: 1367
🏁 Script executed:
#!/bin/bash
# Check the .envs/.local/.django file for context and any surrounding comments
cat .envs/.local/.django | head -40Length of output: 1241
Add documentation for the RabbitMQ consumer timeout or reconsider the value.
The 60480000 ms (16.8 hours) timeout significantly exceeds RabbitMQ's default recommended value of 1,800,000 ms (30 minutes), and best practices recommend keeping the default 30 minutes for most workloads and increasing to "a few hours" only if consumers legitimately require longer to acknowledge messages. While your codebase does include reprocessing pipelines that may justify an extended timeout, no documentation currently explains why this specific value is needed. RabbitMQ guidance cautions against overly long timeouts as they can mask stuck consumers—a valid concern for development environments.
Add a comment explaining the 16.8-hour requirement, or verify whether a shorter timeout (e.g., 1–2 hours, aligned with RabbitMQ recommendations) would suffice for your reprocessing workloads. Additionally, consider using per-queue settings (policy or queue arguments) instead of a global server-level timeout to apply the extended timeout only to queues that need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I set a default for rabbit consumer_timeout as a django setting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find Django settings files and check for Celery/RabbitMQ configuration
echo "=== Django settings files ==="
fd -e py settings
echo -e "\n=== Celery configuration in Django settings ==="
rg -n "CELERY_|celery" --type=py -g "settings*.py" -C 2
echo -e "\n=== RabbitMQ configuration references ==="
rg -n "RABBITMQ_|rabbitmq|consumer_timeout" --type=py -C 2
echo -e "\n=== Docker compose files that might show RabbitMQ container config ==="
fd -e yml -e yaml docker-compose
Length of output: 635
🏁 Script executed:
#!/bin/bash
# Find Django settings directory
echo "=== Finding Django config/settings ==="
fd settings.py
echo -e "\n=== Check docker-compose.yml for RabbitMQ configuration ==="
cat docker-compose.yml | grep -A 20 -B 5 "rabbitmq"
echo -e "\n=== Check for Celery configuration files ==="
fd -e py celery
echo -e "\n=== Search for any RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS usage ==="
rg "RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" -A 2 -B 2
Length of output: 1682
🏁 Script executed:
#!/bin/bash
# Check Celery configuration
echo "=== Celery app configuration ==="
cat config/celery_app.py
echo -e "\n=== Check if there are Django settings directories ==="
fd -t d settings
echo -e "\n=== Look for any timeout-related Celery settings ==="
rg -i "timeout" config/celery_app.py
Length of output: 782
🏁 Script executed:
#!/bin/bash
# Check Django settings for any existing Celery configuration
echo "=== Django settings files ==="
cat config/settings/base.py | grep -A 5 -B 2 "CELERY" || echo "No CELERY settings in base.py"
echo -e "\n=== Check local settings ==="
cat config/settings/local.py | grep -A 5 -B 2 "CELERY" || echo "No CELERY settings in local.py"
echo -e "\n=== List all settings files ==="
ls -la config/settings/
echo -e "\n=== Search for any Celery timeout configurations ==="
rg "CELERY.*TIMEOUT|task_time_limit|task_soft_time_limit" config/settings/
Length of output: 4963
@mihow No, you cannot set RabbitMQ's consumer_timeout as a Django setting because it's a server-side RabbitMQ configuration, not a client-side setting.
Here's the key distinction:
Server-side (RabbitMQ):
consumer_timeoutis an Erlang VM argument that configures the RabbitMQ server itself- Must be set via
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGSenvironment variable (as you're doing now) or inrabbitmq.conf - Controls how long the RabbitMQ server waits before disconnecting inactive consumers
Client-side (Django/Celery):
- Django settings (including
CELERY_*settings) only control how your Django application and Celery workers connect to and interact with RabbitMQ - I can see you already have
CELERY_TASK_TIME_LIMIT = 4 * 60 * 60 * 24(4 days) inconfig/settings/base.py, which controls how long tasks can run before being terminated by Celery
Alternative approaches:
- Keep the environment variable approach (current - standard for Docker deployments)
- Use RabbitMQ policies to set
x-consumer-timeoutper queue if you only need extended timeouts for specific queues - Create a
rabbitmq.conffile mounted into the container
The environment variable approach you're using is the standard way to configure RabbitMQ server settings in containerized deployments.
| reprocess_all_images = False | ||
| if job_id is not None: | ||
| try: | ||
| job = Job.objects.get(pk=job_id) | ||
| reprocess_all_images = job.project.feature_flags.reprocess_all_images | ||
| job.logger.info( | ||
| f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})" | ||
| ) | ||
| except Job.DoesNotExist as e: | ||
| logger.error(f"Job {job_id} not found: {e}") | ||
|
|
||
| else: | ||
| logger.info( | ||
| f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set job_id = None when Job doesn't exist to prevent downstream errors.
When Job.DoesNotExist is caught (line 26), the code continues with the original (invalid) job_id value, which is then passed to process_images() (line 41) and save_results() (line 46). While you mentioned that "process_images handles job_id=None", the code doesn't actually set job_id = None in the exception handler.
Additionally, use logger.exception() instead of logger.error() to capture the full traceback. Based on learnings (past review comments and static analysis).
Apply this diff:
job = None
reprocess_all_images = False
if job_id is not None:
try:
job = Job.objects.get(pk=job_id)
reprocess_all_images = job.project.feature_flags.reprocess_all_images
job.logger.info(
f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})"
)
except Job.DoesNotExist as e:
- logger.error(f"Job {job_id} not found: {e}")
+ logger.exception(f"Job {job_id} not found: {e}")
+ job_id = None
else:
logger.info(
f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| reprocess_all_images = False | |
| if job_id is not None: | |
| try: | |
| job = Job.objects.get(pk=job_id) | |
| reprocess_all_images = job.project.feature_flags.reprocess_all_images | |
| job.logger.info( | |
| f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})" | |
| ) | |
| except Job.DoesNotExist as e: | |
| logger.error(f"Job {job_id} not found: {e}") | |
| else: | |
| logger.info( | |
| f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})" | |
| ) | |
| reprocess_all_images = False | |
| if job_id is not None: | |
| try: | |
| job = Job.objects.get(pk=job_id) | |
| reprocess_all_images = job.project.feature_flags.reprocess_all_images | |
| job.logger.info( | |
| f"Processing {len(image_ids)} images for job {job} (reprocess_all_images={reprocess_all_images})" | |
| ) | |
| except Job.DoesNotExist as e: | |
| logger.exception(f"Job {job_id} not found: {e}") | |
| job_id = None | |
| else: | |
| logger.info( | |
| f"Processing {len(image_ids)} images for job_id=None (reprocess_all_images={reprocess_all_images})" | |
| ) |
🧰 Tools
🪛 Ruff (0.14.5)
27-27: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
ami/ml/tasks.py around lines 18 to 32: when Job.DoesNotExist is caught the code
logs the error but leaves job_id unchanged which can cause downstream errors;
update the exception handler to set job_id = None (so downstream calls receive
None) and replace logger.error(...) with logger.exception(...) to record the
full traceback; keep the existing log message context and ensure any local `job`
variable isn’t referenced after the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai make a new ticket about this issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Create a GitHub issue for the job_id handling problem
gh issue create \
--repo RolnickLab/antenna \
--title "Handle Job.DoesNotExist exception properly in process_source_images_async" \
--body "## Problem Description
In \`ami/ml/tasks.py\`, the \`process_source_images_async\` function catches \`Job.DoesNotExist\` exceptions but doesn't reset \`job_id\` to \`None\`. This means the invalid \`job_id\` is passed to downstream functions (\`process_images()\` and \`save_results()\`), which could cause errors.
Additionally, the exception handler uses \`logger.error()\` instead of \`logger.exception()\`, which means the full traceback is not captured for debugging.
## Context
- **File:** \`ami/ml/tasks.py\`
- **Function:** \`process_source_images_async\`
- **Related PR:** #1053
- **Review Comment:** https://github.com/RolnickLab/antenna/pull/1053#discussion_r2540537287
## Expected Behavior
When a \`Job.DoesNotExist\` exception occurs:
1. The \`job_id\` variable should be set to \`None\` to prevent passing an invalid ID downstream
2. The full exception traceback should be logged using \`logger.exception()\` for debugging
---
Requested by: @mihow" \
--assignee mihow
Length of output: 49
@mihow, I've created issue #1061 to track this problem: #1061
The issue has been assigned to you and includes the problem description along with links back to this PR and comment.
🧬 Code Graph Analysis Results
[
{
"filename": "ami/ml/media.py",
"start_line": 78,
"end_line": 94,
"snippet_type": "full",
"content": "def create_detection_images_from_source_image(source_image: SourceImage) -> list[str]:\n # Check if the source image has detections without images before loading the image\n if not source_image.detections.filter(path__isnull=True).exists():\n return []\n\n image_np = load_source_image(source_image)\n processed_paths = []\n\n for detection in source_image.detections.filter(path__isnull=True):\n if detection.bbox:\n cropped_image = crop_detection(image_np, detection.bbox)\n path = save_crop(cropped_image, detection, source_image)\n detection.path = path\n detection.save()\n processed_paths.append(path)\n\n return processed_paths"
},
{
"filename": "ami/ml/models/pipeline.py",
"start_line": 163,
"end_line": 211,
"snippet_type": "summary",
"content": ""def process_images(pipeline, endpoint_url, images, job_id=None, project_id=None, reprocess_all_images=False) -> PipelineResultsResponse":\n\n Process images by sending a request to the ML backend for a given pipeline.\n\n Parameters:\n - pipeline: Pipeline instance describing stages and config.\n - endpoint_url: Backend endpoint to which the request is posted.\n - images: Iterable of SourceImage instances to process.\n - job_id: Optional Job ID for logger context; if provided, uses the job's logger.\n - project_id: Optional project ID to derive pipeline config overrides.\n - reprocess_all_images: If True, forces reprocessing of all images; otherwise filters already processed ones.\n\n Returns:\n - PipelineResultsResponse containing processed images, detections, and metadata.\n\n Exceptions/Errors:\n - Logs errors and returns a results object with errors if the backend call fails.\n\n Notes:\n - Builds a request payload including source images, config, and any pre-detected detections.\n - Uses the provided endpoint_url and handles logging via either the Job logger or a module logger."
},
{
"filename": "ami/ml/models/pipeline.py",
"start_line": 814,
"end_line": 922,
"snippet_type": "summary",
"content": ""def save_results(results: PipelineResultsResponse | None = None, results_json: str | None = None, job_id: int | None = None, return_created=False) -> PipelineSaveResults | None":\n\n Save results returned by the ML pipeline backend into the database.\n\n Parameters:\n - results: PipelineResultsResponse object parsed in memory.\n - results_json: Raw JSON string representing the results (alternative to results).\n - job_id: Optional job context for logging and associations.\n - return_created: If True, return a PipelineSaveResults object describing created entities.\n\n Returns:\n - PipelineSaveResults when return_created is True; otherwise None.\n\n Behavior:\n - Creates/associates Pipeline, SourceImage, Detections, Classifications, and Occurrences.\n - Spawns sub-tasks to create detection images for detected sources.\n - Updates calculated fields on affected SourceImage and Events.\n - Logs timing and outcomes."
},
{
"filename": "ami/main/models.py",
"start_line": 2232,
"end_line": 2243,
"snippet_type": "full",
"content": "def find_duplicates(self, project_id: int | None = None) -> models.QuerySet:\n # Find the oldest classification for each unique combination\n if project_id:\n self = self.filter(detection__source_image__project_id=project_id)\n unique_oldest = (\n self.values("detection", "taxon", "algorithm", "score", "softmax_output", "raw_output")\n .annotate(min_id=models.Min("id"))\n .distinct()\n )\n\n # Keep only the oldest classifications\n return self.exclude(id__in=[item["min_id"] for item in unique_oldest])"
},
{
"filename": "ami/ml/models/processing_service.py",
"start_line": 36,
"end_line": 222,
"snippet_type": "summary",
"content": ""class ProcessingService(BaseModel):" ... "def get_status(self, timeout=90) -> ProcessingServiceStatusResponse":\n\n Health check for a processing service.\n\n - Pings the /readyz endpoint with a session that includes retry/backoff logic.\n - Updates last_checked, last_checked_live, and last_checked_latency fields on the service.\n - Parses pipeline statuses from the response if available.\n\n Returns a ProcessingServiceStatusResponse containing timestamp, request_successful, server_live,\n pipelines_online, pipeline_configs, endpoint_url, error, latency."
},
{
"filename": "ami/jobs/models.py",
"start_line": 990,
"end_line": 999,
"snippet_type": "full",
"content": " def logger(self) -> logging.Logger:\n _logger = logging.getLogger(f"ami.jobs.{self.pk}")\n\n # Only add JobLogHandler if not already present\n if not any(isinstance(h, JobLogHandler) for h in _logger.handlers):\n # Also log output to a field on thie model instance\n logger.info("Adding JobLogHandler to logger for job %s", self.pk)\n _logger.addHandler(JobLogHandler(self))\n _logger.propagate = False\n return _logger"
}
]
|
@carlosgjs I made a dedicated PR for the consumer timeout change |

Summary
This pull request introduces a new
reprocess_all_imagesfeature flag to control whether or not already-processed images are filtered during processing jobs (the default behavior), and propagates this flag throughout the image processing pipeline. This allows projects to configure whether jobs should reprocess existing images or ignore them, improving flexibility for development and production use.Configuration and Tooling
flake8linter version in.pre-commit-config.yamlfrom 6.0.0 to 6.1.0 because 6.0.0 fires false negatives inside f-strings. E.g.where
"not in"is not code but a string message.Tests:
With default

reprocess_all_images=false:Run time:
1:07Changed to true:
Checklist
Summary by CodeRabbit
New Features
Chores