-
Notifications
You must be signed in to change notification settings - Fork 5
Support re-processing detections and skipping localizer #815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 51 commits
8aad275
61b45a4
4a03c7e
09d7dfb
996674e
ce973fc
bf7178d
41efa42
78babeb
8d28d01
bb22514
1dbc5f0
fe1a9f4
7747f3a
1978cbe
82ac82d
7d733f9
07d61d9
d129029
76ce2d8
035b952
1230386
45dbacf
7361fb2
3f722c8
075a7ec
85c676d
cbd7ae0
7d15ffb
c2881b4
14396ba
6613366
2cf0c0a
1dbf3b1
c82c076
4643910
52f6964
635e671
d89f428
fb874c4
f2ef5ff
b6ce90f
8fe8b1d
d0f4f26
fc8470d
3d3b820
5c7af56
e7e579e
cb74eac
0a80074
015cb57
57de883
5ebe896
789305c
9dab8a5
63e1517
da35e16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,9 @@ | |
from ami.ml.models.algorithm import Algorithm, AlgorithmCategoryMap | ||
from ami.ml.schemas import ( | ||
AlgorithmConfigResponse, | ||
AlgorithmReference, | ||
ClassificationResponse, | ||
DetectionRequest, | ||
DetectionResponse, | ||
PipelineRequest, | ||
PipelineRequestConfigParameters, | ||
|
@@ -62,6 +64,7 @@ def filter_processed_images( | |
Return only images that need to be processed by a given pipeline. | ||
An image needs processing if: | ||
1. It has no detections from the pipeline's detection algorithm | ||
or | ||
2. It has detections but they don't have classifications from all the pipeline's classification algorithms | ||
""" | ||
pipeline_algorithms = pipeline.algorithms.all() | ||
|
@@ -192,25 +195,45 @@ def process_images( | |
task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}") | ||
urls = [source_image.public_url() for source_image in images if source_image.public_url()] | ||
|
||
source_images = [ | ||
SourceImageRequest( | ||
id=str(source_image.pk), | ||
url=url, | ||
) | ||
for source_image, url in zip(images, urls) | ||
if url | ||
] | ||
source_image_requests: list[SourceImageRequest] = [] | ||
detection_requests: list[DetectionRequest] = [] | ||
|
||
for source_image, url in zip(images, urls): | ||
if url: | ||
source_image_request = SourceImageRequest( | ||
id=str(source_image.pk), | ||
url=url, | ||
) | ||
source_image_requests.append(source_image_request) | ||
# Re-process all existing detections if they exist | ||
for detection in source_image.detections.all(): | ||
bbox = detection.get_bbox() | ||
if bbox and detection.detection_algorithm: | ||
detection_requests.append( | ||
DetectionRequest( | ||
source_image=source_image_request, | ||
bbox=bbox, | ||
crop_image_url=detection.url(), | ||
algorithm=AlgorithmReference( | ||
name=detection.detection_algorithm.name, | ||
key=detection.detection_algorithm.key, | ||
), | ||
) | ||
) | ||
|
||
if not project_id: | ||
task_logger.warning(f"Pipeline {pipeline} is not associated with a project") | ||
|
||
config = pipeline.get_config(project_id=project_id) | ||
task_logger.info(f"Using pipeline config: {config}") | ||
|
||
task_logger.info(f"Found {len(detection_requests)} existing detections.") | ||
|
||
request_data = PipelineRequest( | ||
pipeline=pipeline.slug, | ||
source_images=source_images, | ||
source_images=source_image_requests, | ||
config=config, | ||
detections=detection_requests, | ||
) | ||
|
||
session = create_session() | ||
|
@@ -230,7 +253,8 @@ def process_images( | |
pipeline=pipeline.slug, | ||
total_time=0, | ||
source_images=[ | ||
SourceImageResponse(id=source_image.id, url=source_image.url) for source_image in source_images | ||
SourceImageResponse(id=source_image_request.id, url=source_image_request.url) | ||
for source_image_request in source_image_requests | ||
], | ||
detections=[], | ||
errors=msg, | ||
|
@@ -351,23 +375,12 @@ def get_or_create_detection( | |
serialized_bbox = list(detection_resp.bbox.dict().values()) | ||
detection_repr = f"Detection {detection_resp.source_image_id} {serialized_bbox}" | ||
|
||
assert detection_resp.algorithm, f"No detection algorithm was specified for detection {detection_repr}" | ||
try: | ||
detection_algo = algorithms_used[detection_resp.algorithm.key] | ||
except KeyError: | ||
raise ValueError( | ||
f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. " | ||
"The processing service must declare it in the /info endpoint. " | ||
f"Known algorithms: {list(algorithms_used.keys())}" | ||
) | ||
|
||
assert str(detection_resp.source_image_id) == str( | ||
source_image.pk | ||
), f"Detection belongs to a different source image: {detection_repr}" | ||
|
||
existing_detection = Detection.objects.filter( | ||
source_image=source_image, | ||
detection_algorithm=detection_algo, | ||
bbox=serialized_bbox, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The query for existing detections no longer filters by detection_algorithm, which could cause incorrect detection matching when the same bbox exists from different algorithms. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is intentional, we want to classify detections from any previous detection algorithm, unless the user disables it for the pipeline run. |
||
).first() | ||
|
||
|
@@ -387,6 +400,16 @@ def get_or_create_detection( | |
detection = existing_detection | ||
|
||
else: | ||
assert detection_resp.algorithm, f"No detection algorithm was specified for detection {detection_repr}" | ||
try: | ||
detection_algo = algorithms_used[detection_resp.algorithm.key] | ||
except KeyError: | ||
raise ValueError( | ||
f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. " | ||
"The processing service must declare it in the /info endpoint. " | ||
f"Known algorithms: {list(algorithms_used.keys())}" | ||
) | ||
|
||
new_detection = Detection( | ||
source_image=source_image, | ||
bbox=serialized_bbox, | ||
|
@@ -1007,7 +1030,7 @@ def collect_images( | |
) | ||
|
||
def choose_processing_service_for_pipeline( | ||
self, job_id: int, pipeline_name: str, project_id: int | ||
self, job_id: int | None, pipeline_name: str, project_id: int | ||
) -> ProcessingService: | ||
# @TODO use the cached `last_checked_latency` and a max age to avoid checking every time | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,7 +158,7 @@ def test_created_category_maps(self): | |
|
||
def test_alignment_of_predictions_and_category_map(self): | ||
# Ensure that the scores and labels are aligned | ||
pipeline = self.processing_service_instance.pipelines.all().get(slug="random") | ||
pipeline = self.processing_service_instance.pipelines.all().get(slug="random-detection-random-species") | ||
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk) | ||
results = save_results(pipeline_response, return_created=True) | ||
assert results is not None, "Expected results to be returned in a PipelineSaveResults object" | ||
|
@@ -172,7 +172,7 @@ def test_alignment_of_predictions_and_category_map(self): | |
|
||
def test_top_n_alignment(self): | ||
# Ensure that the top_n parameter works | ||
pipeline = self.processing_service_instance.pipelines.all().get(slug="random") | ||
pipeline = self.processing_service_instance.pipelines.all().get(slug="random-detection-random-species") | ||
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk) | ||
results = save_results(pipeline_response, return_created=True) | ||
assert results is not None, "Expecected results to be returned in a PipelineSaveResults object" | ||
|
@@ -182,6 +182,60 @@ def test_top_n_alignment(self): | |
assert classification.score == top_n[0]["score"] | ||
assert classification.taxon == top_n[0]["taxon"] | ||
|
||
def test_pipeline_reprocessing(self): | ||
""" | ||
Test that reprocessing the same images with differet pipelines does not create duplicate | ||
detections. The 2 pipelines used are a random detection + random species classifier, and a | ||
constant species classifier. | ||
""" | ||
# Process the images once | ||
pipeline = self.processing_service_instance.pipelines.all().get(slug="random-detection-random-species") | ||
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk) | ||
results = save_results(pipeline_response, return_created=True) | ||
assert results is not None, "Expected results to be returned in a PipelineSaveResults object" | ||
assert results.detections, "Expected detections to be returned in the results" | ||
|
||
# This particular pipeline produces 2 classifications per detection | ||
for det in results.detections: | ||
num_classifications = det.classifications.count() | ||
assert ( | ||
num_classifications == 2 | ||
), "Expected 2 classifications per detection (random species and random binary classifier)." | ||
|
||
source_images = SourceImage.objects.filter(pk__in=[image.id for image in pipeline_response.source_images]) | ||
detections = Detection.objects.filter(source_image__in=source_images).select_related( | ||
"detection_algorithm", | ||
"detection_algorithm__category_map", | ||
) | ||
initial_detection_ids = sorted([det.pk for det in detections]) | ||
assert detections.count() > 0 | ||
|
||
# Reprocess the same images using a different pipeline | ||
pipeline = self.processing_service_instance.pipelines.all().get(slug="constant") | ||
pipeline_response = pipeline.process_images(self.test_images, project_id=self.project.pk) | ||
|
||
reprocessed_results = save_results(pipeline_response, return_created=True) | ||
assert reprocessed_results is not None, "Expected results to be returned in a PipelineSaveResults object" | ||
assert reprocessed_results.detections, "Expected detections to be returned in the results" | ||
|
||
source_images = SourceImage.objects.filter(pk__in=[image.id for image in pipeline_response.source_images]) | ||
detections = Detection.objects.filter(source_image__in=source_images).select_related( | ||
"detection_algorithm", | ||
"detection_algorithm__category_map", | ||
) | ||
|
||
# Check detections were re-processed, and not re-created | ||
reprocessed_detection_ids = sorted([det.pk for det in detections]) | ||
assert initial_detection_ids == reprocessed_detection_ids, ( | ||
"Expected the same detections to be returned after reprocessing with a different pipeline, " | ||
f"but found {initial_detection_ids} != {reprocessed_detection_ids}" | ||
) | ||
|
||
# The constant pipeline produces 1 classification per detection | ||
for detection in detections: | ||
assert ( | ||
detection.classifications.count() == 3 | ||
vanessavmac marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
), "Expected 3 classifications per detection (2 random classifiers + constant classifier)." | ||
|
||
|
||
class TestPipeline(TestCase): | ||
def setUp(self): | ||
|
Uh oh!
There was an error while loading. Please reload this page.