Skip to content
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
8aad275
Set up customizable local processing service
vanessavmac Mar 23, 2025
61b45a4
Set up separate docker compose stack, rename ml backend services
vanessavmac Apr 3, 2025
4a03c7e
WIP: README.md
vanessavmac Apr 4, 2025
09d7dfb
Improve processing flow
vanessavmac Apr 5, 2025
996674e
fix: tests and postgres connection
vanessavmac Apr 5, 2025
ce973fc
Update READMEs with minimal/example setups
vanessavmac Apr 5, 2025
bf7178d
fix: transformers fixed version
vanessavmac Apr 5, 2025
41efa42
Add tests
vanessavmac Apr 5, 2025
78babeb
Typos, warn --> warnings
vanessavmac Apr 5, 2025
8d28d01
Add support for Darsa flat-bug
vanessavmac Apr 6, 2025
bb22514
chore: Change the Pipeline class name to FlatBugDetectorPipeline to a…
mohamedelabbas1996 Apr 7, 2025
1dbc5f0
Move README
vanessavmac Apr 8, 2025
fe1a9f4
Address comment tasks
vanessavmac Apr 13, 2025
7747f3a
Merge branch 'main' into 747-get-antenna-to-work-locally-on-laptops-f…
vanessavmac Apr 13, 2025
1978cbe
Update README
vanessavmac Apr 13, 2025
82ac82d
Pass in pipeline request config, properly cache models, simplifications
vanessavmac Apr 15, 2025
7d733f9
Pass in pipeline request config, properly cache models, simplifications
vanessavmac Apr 15, 2025
07d61d9
fix: update docker compose instructions & build path
mihow Apr 16, 2025
d129029
feat: use ["insect"] for the default zero-shot class
mihow Apr 16, 2025
76ce2d8
feat: try to use faster version of zero-shot detector
mihow Apr 16, 2025
035b952
feat: use gpu if available
mihow Apr 17, 2025
1230386
fix: update minimal docker compose build path
vanessavmac Apr 17, 2025
45dbacf
Add back crop_image_url
vanessavmac Apr 26, 2025
7361fb2
Support re-processing detections and skipping localizer
vanessavmac Apr 27, 2025
3f722c8
fix: correctly pass candidate labels for zero shot object detector
vanessavmac Apr 27, 2025
075a7ec
Support re-processing detections and skipping localizer
vanessavmac Apr 27, 2025
85c676d
fix: merge conflict
vanessavmac Apr 27, 2025
cbd7ae0
fix: allow empty pipeline request config
vanessavmac Apr 27, 2025
7d15ffb
fix: allow empty pipeline request config
vanessavmac Apr 27, 2025
c2881b4
clean up
vanessavmac Apr 27, 2025
14396ba
fix: ignore detection algorithm during reprocessing
vanessavmac Apr 29, 2025
6613366
remove flat bug
vanessavmac Apr 29, 2025
2cf0c0a
feat: only use zero shot and HF classifier algorithms
vanessavmac Apr 29, 2025
1dbf3b1
clean up
vanessavmac Apr 29, 2025
c82c076
Merge branch '747-get-antenna-to-work-locally-on-laptops-for-panama-t…
vanessavmac Apr 29, 2025
4643910
Expand support for date formats in image filenames (#809)
mihow Apr 29, 2025
52f6964
fix: change name of the new docker network (#819)
mihow Apr 30, 2025
635e671
Merge branch 'main' of github.com:RolnickLab/antenna into 706-support…
mihow Apr 30, 2025
d89f428
docs: clarify new Detection schema/class
mihow Apr 30, 2025
fb874c4
Function for creating detection instances from requests
vanessavmac May 17, 2025
f2ef5ff
Add reprocessing to minimal app
vanessavmac May 17, 2025
b6ce90f
Merge branch 'main' into 706-support-for-reprocessing-detections-and-…
vanessavmac May 17, 2025
8fe8b1d
Merge branch 'main' into 706-support-for-reprocessing-detections-and-…
vanessavmac Jun 28, 2025
d0f4f26
Add re-processing test
vanessavmac Jun 28, 2025
fc8470d
Merge branch 'main' into 706-support-for-reprocessing-detections-and-…
mihow Jul 7, 2025
3d3b820
Fix requirements
vanessavmac Jul 12, 2025
5c7af56
Address review comments
vanessavmac Jul 12, 2025
e7e579e
Only open source image once
vanessavmac Jul 12, 2025
cb74eac
Merge branch 'main' into 706-support-for-reprocessing-detections-and-…
vanessavmac Jul 12, 2025
0a80074
Merge branch '706-support-for-reprocessing-detections-and-skipping-de…
mihow Aug 15, 2025
015cb57
Merge branch 'main' of github.com:RolnickLab/antenna into 706-support…
mihow Aug 15, 2025
57de883
feat: cache huggingface & torch models that are auto-downloaded
mihow Aug 16, 2025
5ebe896
fix: leave gpu passthrough as an example, off by default
mihow Aug 16, 2025
789305c
feat: feature flag & pipeline config for reprocessing detections
mihow Aug 16, 2025
9dab8a5
fix: spelling in previous feature flag
mihow Aug 16, 2025
63e1517
chore: migration for new & corrected feature flags
mihow Aug 16, 2025
da35e16
fix: append detections instead of overriding. add feature flag to tests.
mihow Aug 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ami/main/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ami.base.fields import DateStringField
from ami.base.models import BaseModel
from ami.main import charts
from ami.ml.schemas import BoundingBox
from ami.users.models import User
from ami.utils.schemas import OrderedEnum

Expand Down Expand Up @@ -2102,6 +2103,17 @@ class Detection(BaseModel):
source_image_id: int
detection_algorithm_id: int

def get_bbox(self):
if self.bbox:
return BoundingBox(
x1=self.bbox[0],
y1=self.bbox[1],
x2=self.bbox[2],
y2=self.bbox[3],
)
else:
return None

# def bbox(self):
# return (
# self.bbox_x,
Expand Down
61 changes: 42 additions & 19 deletions ami/ml/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
from ami.ml.models.algorithm import Algorithm, AlgorithmCategoryMap
from ami.ml.schemas import (
AlgorithmConfigResponse,
AlgorithmReference,
ClassificationResponse,
DetectionRequest,
DetectionResponse,
PipelineRequest,
PipelineRequestConfigParameters,
Expand All @@ -61,6 +63,7 @@ def filter_processed_images(
Return only images that need to be processed by a given pipeline.
An image needs processing if:
1. It has no detections from the pipeline's detection algorithm
or
2. It has detections but they don't have classifications from all the pipeline's classification algorithms
"""
pipeline_algorithms = pipeline.algorithms.all()
Expand Down Expand Up @@ -191,25 +194,46 @@ def process_images(
task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}")
urls = [source_image.public_url() for source_image in images if source_image.public_url()]

source_images = [
SourceImageRequest(
id=str(source_image.pk),
url=url,
)
for source_image, url in zip(images, urls)
if url
]
source_images: list[SourceImageRequest] = []
detection_requests: list[DetectionRequest] = []

for source_image, url in zip(images, urls):
if url:
source_images.append(
SourceImageRequest(
id=str(source_image.pk),
url=url,
)
)
# Only re-process detections created by the pipeline's detector
for detection in source_image.detections.all():
bbox = detection.get_bbox()
if bbox and detection.detection_algorithm:
detection_requests.append(
DetectionRequest(
source_image=source_images[-1],
Copy link
Collaborator

@mihow mihow Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure this is the right source image! Can you use just source_image=source_image instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like DetectionRequest expects a SourceImageRequest so I was assigning it the last created request (which corresponds to the source_image. Tried to make my code a bit clearer like this:

source_image_request = SourceImageRequest(
id=str(source_image.pk),
url=url,
)
source_image_requests.append(source_image_request)
# Re-process all existing detections if they exist
for detection in source_image.detections.all():
bbox = detection.get_bbox()
if bbox and detection.detection_algorithm:
detection_requests.append(
DetectionRequest(
source_image=source_image_request,

bbox=bbox,
crop_image_url=detection.url(),
algorithm=AlgorithmReference(
name=detection.detection_algorithm.name,
key=detection.detection_algorithm.key,
),
)
)

if not project_id:
task_logger.warning(f"Pipeline {pipeline} is not associated with a project")

config = pipeline.get_config(project_id=project_id)
task_logger.info(f"Using pipeline config: {config}")

task_logger.info(f"Found {len(detection_requests)} existing detections.")

request_data = PipelineRequest(
pipeline=pipeline.slug,
source_images=source_images,
config=config,
detections=detection_requests,
)

session = create_session()
Expand Down Expand Up @@ -350,23 +374,12 @@ def get_or_create_detection(
serialized_bbox = list(detection_resp.bbox.dict().values())
detection_repr = f"Detection {detection_resp.source_image_id} {serialized_bbox}"

assert detection_resp.algorithm, f"No detection algorithm was specified for detection {detection_repr}"
try:
detection_algo = algorithms_used[detection_resp.algorithm.key]
except KeyError:
raise ValueError(
f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. "
"The processing service must declare it in the /info endpoint. "
f"Known algorithms: {list(algorithms_used.keys())}"
)

assert str(detection_resp.source_image_id) == str(
source_image.pk
), f"Detection belongs to a different source image: {detection_repr}"

existing_detection = Detection.objects.filter(
source_image=source_image,
detection_algorithm=detection_algo,
bbox=serialized_bbox,
Copy link
Preview

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query for existing detections no longer filters by detection_algorithm, which could cause incorrect detection matching when the same bbox exists from different algorithms.

Copilot uses AI. Check for mistakes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional, we want to classify detections from any previous detection algorithm, unless the user disables it for the pipeline run.

).first()

Expand All @@ -386,6 +399,16 @@ def get_or_create_detection(
detection = existing_detection

else:
assert detection_resp.algorithm, f"No detection algorithm was specified for detection {detection_repr}"
try:
detection_algo = algorithms_used[detection_resp.algorithm.key]
except KeyError:
raise ValueError(
f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. "
"The processing service must declare it in the /info endpoint. "
f"Known algorithms: {list(algorithms_used.keys())}"
)

new_detection = Detection(
source_image=source_image,
bbox=serialized_bbox,
Expand Down
28 changes: 18 additions & 10 deletions ami/ml/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,6 @@ class ClassificationResponse(pydantic.BaseModel):
timestamp: datetime.datetime


class DetectionResponse(pydantic.BaseModel):
source_image_id: str
bbox: BoundingBox
inference_time: float | None = None
algorithm: AlgorithmReference
timestamp: datetime.datetime
crop_image_url: str | None = None
classifications: list[ClassificationResponse] = []


class SourceImageRequest(pydantic.BaseModel):
# @TODO bring over new SourceImage & b64 validation from the lepsAI repo
id: str
Expand All @@ -144,6 +134,23 @@ class Config:
]


class DetectionRequest(pydantic.BaseModel):
source_image: SourceImageRequest # the 'original' image
bbox: BoundingBox
crop_image_url: str | None = None
algorithm: AlgorithmReference


class DetectionResponse(pydantic.BaseModel):
source_image_id: str
bbox: BoundingBox
inference_time: float | None = None
algorithm: AlgorithmReference
timestamp: datetime.datetime
crop_image_url: str | None = None
classifications: list[ClassificationResponse] = []


class PipelineRequestConfigParameters(dict):
"""Parameters used to configure a pipeline request.

Expand All @@ -166,6 +173,7 @@ class PipelineRequestConfigParameters(dict):
class PipelineRequest(pydantic.BaseModel):
pipeline: str
source_images: list[SourceImageRequest]
detections: list[DetectionRequest] | None = None
config: PipelineRequestConfigParameters | dict | None = None


Expand Down
50 changes: 48 additions & 2 deletions ami/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def test_created_category_maps(self):

def test_alignment_of_predictions_and_category_map(self):
# Ensure that the scores and labels are aligned
pipeline = self.processing_service_instance.pipelines.all().get(slug="random")
pipeline = self.processing_service_instance.pipelines.all().get(slug="random-detection-random-species")
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk)
results = save_results(pipeline_response, return_created=True)
assert results is not None, "Expected results to be returned in a PipelineSaveResults object"
Expand All @@ -172,7 +172,7 @@ def test_alignment_of_predictions_and_category_map(self):

def test_top_n_alignment(self):
# Ensure that the top_n parameter works
pipeline = self.processing_service_instance.pipelines.all().get(slug="random")
pipeline = self.processing_service_instance.pipelines.all().get(slug="random-detection-random-species")
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk)
results = save_results(pipeline_response, return_created=True)
assert results is not None, "Expecected results to be returned in a PipelineSaveResults object"
Expand All @@ -182,6 +182,52 @@ def test_top_n_alignment(self):
assert classification.score == top_n[0]["score"]
assert classification.taxon == top_n[0]["taxon"]

def test_pipeline_reprocessing(self):
"""
Test that reprocessing the same images with differet pipelines does not create duplicate
detections. The 2 pipelines used are a random detection + random species classifier, and a
constant species classifier.
"""
# Process the images once
pipeline = self.processing_service_instance.pipelines.all().get(slug="random-detection-random-species")
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk)
results = save_results(pipeline_response, return_created=True)
assert results is not None, "Expected results to be returned in a PipelineSaveResults object"
assert results.detections, "Expected detections to be returned in the results"

# This particular pipeline produces 2 classifications per detection
for det in results.detections:
num_classifications = det.classifications.count()
assert (
num_classifications == 2
), "Expected 2 classifications per detection (random species and random binary classifier)."

source_images = SourceImage.objects.filter(pk__in=[image.id for image in pipeline_response.source_images])
detections = Detection.objects.filter(source_image__in=source_images).select_related(
"detection_algorithm",
"detection_algorithm__category_map",
)
assert detections.count() > 0
initial_num_detections = detections.count()

# Reprocess the same images
pipeline = self.processing_service_instance.pipelines.all().get(slug="constant")
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk)
Copy link
Collaborator

@mihow mihow Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should make a way to check the request data that will be sent to the pipeline. In this case, it should have detections in the request. Whereas the other requests only have the source images. This can be a later optimization (prepare request data, then manually pass it to process_images)

reprocessed_results = save_results(pipeline_response, return_created=True)
assert reprocessed_results is not None, "Expected results to be returned in a PipelineSaveResults object"
assert reprocessed_results.detections, "Expected detections to be returned in the results"

source_images = SourceImage.objects.filter(pk__in=[image.id for image in pipeline_response.source_images])
detections = Detection.objects.filter(source_image__in=source_images).select_related(
"detection_algorithm",
"detection_algorithm__category_map",
)
assert initial_num_detections == detections.count(), "Expected no new detections to be created."
for detection in detections:
assert (
detection.classifications.count() == 3
), "Expected 3 classifications per detection (2 random classifiers + constant classifier)."


class TestPipeline(TestCase):
def setUp(self):
Expand Down
56 changes: 43 additions & 13 deletions processing_services/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ In this directory, we define locally-run processing services as FastAPI apps. A
- `/info`: returns data about what pipelines and algorithms are supported by the service.
- `/livez`
- `/readyz`
- `/process`: receives source images via a `PipelineRequest` and returns a `PipelineResponse` containing detections
- `/process`: receives source images and existing detections via a `PipelineRequest` and returns a `PipelineResponse` containing detections

`processing_services` contains 2 apps:
- `example`: demos how to add custom pipelines/algorithms.
- `minimal`: a simple ML backend for basic testing of the processing service API. This minimal app also runs within the main Antenna docker compose stack.

If your goal is to run an ML backend locally, simply copy the `example` directory and follow the steps below.
If your goal is to run an ML backend locally, simply copy the `example` app and follow the steps below.

## Environment Set Up

1. Update `processing_services/example/requirements.txt` with required packages (i.e. PyTorch, etc)
2. Rebuild container to install updated dependencies. Start the minimal and example ML backends: `docker compose -f processing_services/docker-compose.yml up -d --build ml_backend_example`
3. To test that everything works, register a new processing service in Antenna with endpoint URL http://ml_backend_example:2000. All ML backends are connected to the main docker compose stack using the `antenna_network`.
3. To test that everything works, register a new processing service in Antenna with endpoint URL http://ml_backend_example:2000. All ML backends are connected to the main docker compose stack using the `ml_network`.


## Add Algorithms, Pipelines, and ML Backend/Processing Services
Expand All @@ -38,26 +38,56 @@ If your goal is to run an ML backend locally, simply copy the `example` director
3. Implement the `run()` function. Some important considerations:
- Always run `_get_pipeline_response` at the end of `run()` to get a valid `PipelineResultsResponse`
- Each algorithm/stage in a pipeline should take a list of `SourceImage`s or `Detection`s and produce a list of `Detection`s (with or without classifications). The class member function `_get_detections()` handles this general stage structure; it batchifys the inputs and produces output detections.
- 2 example pipelines are already implemented:
- `ZeroShotHFClassifierPipeline`: localizer + classifier
- `ZeroShotObjectDetectorPipeline`: detector
- 4 example pipelines are already implemented. See the table at the end of the README for examples of what detections from each pipeline look like.
- `ZeroShotHFClassifierPipeline`
- `ZeroShotObjectDetectorPipeline`
- `ZeroShotObjectDetectorWithRandomSpeciesClassifierPipeline`
- `ZeroShotObjectDetectorWithConstantClassifierPipeline`

4. Add `NewPipeline` to `processing_services/example/api/api.py`

```
from .pipelines import Pipeline, ZeroShotHFClassifierPipeline, ZeroShotObjectDetectorPipeline, NewPipeline

...

pipelines: list[type[Pipeline]] = [ZeroShotHFClassifierPipeline, ZeroShotObjectDetectorPipeline, NewPipeline ]

from .pipelines import (
Pipeline,
ZeroShotHFClassifierPipeline,
ZeroShotObjectDetectorPipeline,
ZeroShotObjectDetectorWithConstantClassifierPipeline,
ZeroShotObjectDetectorWithRandomSpeciesClassifierPipeline,
NewPipeline
)
...

pipelines: list[type[Pipeline]] = [
ZeroShotHFClassifierPipeline,
ZeroShotObjectDetectorPipeline,
ZeroShotObjectDetectorWithConstantClassifierPipeline,
ZeroShotObjectDetectorWithRandomSpeciesClassifierPipeline,
NewPipeline,
]
```
5. Update `PipelineChoice` in `processing_services/example/api/schemas.py` to include the slug of the new pipeline, as defined in `NewPipeline`'s config.

```
PipelineChoice = typing.Literal[
"zero-shot-hf-classifier-pipeline", "zero-shot-object-detector-pipeline", "new-pipeline"
"zero-shot-hf-classifier-pipeline",
"zero-shot-object-detector-pipeline",
"zero-shot-object-detector-with-constant-classifier-pipeline",
"zero-shot-object-detector-with-random-species-classifier-pipeline",
"new-pipeline-slug",
]
```
## Demo

## `minimal` Pipelines and Output Images

- `ConstantPipeline` and `RandomDetectionRandomSpeciesPipeline`
![MinimalReprocessing](images/MinimalReprocessing.png)


## `example` Pipelines and Output Images

- `ZeroShotHFClassifierPipeline`
![ZeroShotHFClassifierPipeline](images/ZeroShotHFClassifierPipeline.png)

- `ZeroShotObjectDetectorWithRandomSpeciesClassifierPipeline` and `ZeroShotObjectDetectorWithConstantClassifierPipeline` (using reprocessing, skips the Zero Shot Object Detector if there are existing detections)
![ZeroShotReprocessing](images/ZeroShotReprocessing.png)
Loading