From 08192fa25bd85fe3d18e08c70a6b7c4293d6690a Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 17:48:20 -0700 Subject: [PATCH 01/15] feat: use enums for valid algorithm task types --- ami/ml/models/algorithm.py | 58 ++++++++++++++++++++++++-------------- ami/ml/models/pipeline.py | 2 +- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index 82753aac4..2e9bd6732 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -1,5 +1,6 @@ from __future__ import annotations +import enum from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -110,6 +111,31 @@ def with_category_count(self): return self.annotate(category_count=ArrayLength("category_map__labels")) +# Task types enum for better type checking +class AlgorithmTaskType(str, enum.Enum): + DETECTION = "detection" + LOCALIZATION = "localization" + SEGMENTATION = "segmentation" + CLASSIFICATION = "classification" + EMBEDDING = "embedding" + TRACKING = "tracking" + TAGGING = "tagging" + REGRESSION = "regression" + CAPTIONING = "captioning" + GENERATION = "generation" + TRANSLATION = "translation" + SUMMARIZATION = "summarization" + QUESTION_ANSWERING = "question_answering" + DEPTH_ESTIMATION = "depth_estimation" + POSE_ESTIMATION = "pose_estimation" + SIZE_ESTIMATION = "size_estimation" + OTHER = "other" + UNKNOWN = "unknown" + + def as_choice(self): + return (self.value, self.name.replace("_", " ").title()) + + @typing.final class Algorithm(BaseModel): """A machine learning algorithm""" @@ -120,28 +146,8 @@ class Algorithm(BaseModel): max_length=255, default="unknown", null=True, - choices=[ - ("detection", "Detection"), - ("localization", "Localization"), - ("segmentation", "Segmentation"), - ("classification", "Classification"), - ("embedding", "Embedding"), - ("tracking", "Tracking"), - ("tagging", "Tagging"), - ("regression", "Regression"), - ("captioning", "Captioning"), - ("generation", "Generation"), - ("translation", "Translation"), - ("summarization", "Summarization"), - ("question_answering", "Question Answering"), - ("depth_estimation", "Depth Estimation"), - ("pose_estimation", "Pose Estimation"), - ("size_estimation", "Size Estimation"), - ("other", "Other"), - ("unknown", "Unknown"), - ], + choices=[task_type.as_choice() for task_type in AlgorithmTaskType], ) - detection_algorithm_task_types = ["detection", "localization", "segmentation"] description = models.TextField(blank=True) version = models.IntegerField( default=1, @@ -172,6 +178,16 @@ class Algorithm(BaseModel): objects = AlgorithmQuerySet.as_manager() + detection_task_types = [ + AlgorithmTaskType.DETECTION, + AlgorithmTaskType.LOCALIZATION, + AlgorithmTaskType.SEGMENTATION, + ] + classification_task_types = [ + AlgorithmTaskType.CLASSIFICATION, + AlgorithmTaskType.TAGGING, + ] + def __str__(self): return f'#{self.pk} "{self.name}" ({self.key}) v{self.version}' diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index ed014f779..bc53c9697 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -69,7 +69,7 @@ def filter_processed_images( """ pipeline_algorithms = pipeline.algorithms.all() - detection_type_keys = Algorithm.detection_algorithm_task_types + detection_type_keys = Algorithm.detection_task_types detection_algorithms = pipeline_algorithms.filter(task_type__in=detection_type_keys) if not detection_algorithms.exists(): task_logger.warning(f"Pipeline {pipeline} has no detection algorithms saved. Will reprocess all images.") From df6f4d02a29d5a0f2a3f9a674a125d62126cc2ef Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 17:49:42 -0700 Subject: [PATCH 02/15] feat: create labels hash automatically on create --- ami/ml/models/algorithm.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index 2e9bd6732..961a664f2 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -16,6 +16,18 @@ from ami.base.models import BaseModel +@typing.final +class AlgorithmCategoryMapManager(models.Manager["AlgorithmCategoryMap"]): + def create(self, *args, **kwargs): + """ + Create a new AlgorithmCategoryMap instance and generate its labels_hash. + """ + instance = super().create(*args, **kwargs) + instance.labels_hash = instance.make_labels_hash(instance.labels) + instance.save() + return instance + + @typing.final class AlgorithmCategoryMap(BaseModel): """ @@ -45,6 +57,8 @@ class AlgorithmCategoryMap(BaseModel): algorithms: models.QuerySet[Algorithm] + objects = AlgorithmCategoryMapManager() + def __str__(self): return f"#{self.pk} with {len(self.labels)} classes ({self.version or 'unknown version'})" From 9e449e41a069dd30056de8d94408b58caf00a304 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 17:52:06 -0700 Subject: [PATCH 03/15] feat: require that category maps & algorithms are created before processing --- ami/ml/models/pipeline.py | 153 ++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 90 deletions(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index bc53c9697..f3cd60a00 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -314,37 +314,11 @@ def get_or_create_algorithm_and_category_map( :param algorithm_configs: A dictionary of algorithms from the processing services' "/info" endpoint :param logger: A logger instance from the parent function - :return: A dictionary of algorithms used in the pipeline, keyed by the algorithm key + :return: A dictionary of algorithms registered in the pipeline, keyed by the algorithm key @TODO this should be called when registering a pipeline, not when saving results. But currently we don't have a way to register pipelines. """ - category_map = None - category_map_data = algorithm_config.category_map - if category_map_data: - labels_hash = AlgorithmCategoryMap.make_labels_hash(category_map_data.labels) - category_map, _created = AlgorithmCategoryMap.objects.get_or_create( - # @TODO this is creating a new category map every time - # Will create a new category map if the labels are different - labels_hash=labels_hash, - version=category_map_data.version, - defaults={ - "data": category_map_data.data, - "labels": category_map_data.labels, - "description": category_map_data.description, - "uri": category_map_data.uri, - }, - ) - if _created: - logger.info(f"Registered new category map {category_map}") - else: - logger.info(f"Assigned existing category map {category_map}") - else: - logger.warning( - f"No category map found for algorithm {algorithm_config.key} in response." - " Will attempt to create one from the classification results." - ) - algo, _created = Algorithm.objects.get_or_create( key=algorithm_config.key, version=algorithm_config.version, @@ -353,34 +327,58 @@ def get_or_create_algorithm_and_category_map( "task_type": algorithm_config.task_type, "version_name": algorithm_config.version_name, "uri": algorithm_config.uri, - "category_map": category_map or None, + "category_map": None, }, ) + if _created: + logger.info(f"Registered new algorithm {algo}") + else: + logger.info(f"Using algorithm {algo}") + + algo_fields_updated = [] + new_category_map = None + category_map_data = algorithm_config.category_map + + if (algo.category_map is None or len(algo.category_map.data) == 0) and category_map_data: + # New algorithms will not have a category map yet, and older ones may not either + # The category map data should be in the algorithm config from the /info endpoint + new_category_map = AlgorithmCategoryMap.objects.create( + version=category_map_data.version, + data=category_map_data.data, + labels=category_map_data.labels, + description=category_map_data.description, + uri=category_map_data.uri, + ) + algo.category_map = new_category_map + algo_fields_updated.append("category_map") + logger.info(f"Registered new category map {new_category_map} for algorithm {algo}") + else: + if algorithm_config.task_type in Algorithm.classification_task_types: + msg = ( + f"No valid category map found for algorithm {algorithm_config.key} with " + f"task type {algorithm_config.task_type} or in the pipeline /info response. " + "Update the processing service to include a category map for all classification algorithms " + "then re-register the pipelines." + ) + raise ValueError(msg) + else: + logger.debug(f"No category map found, but not required for task type {algorithm_config.task_type}") # Update fields that may have changed in the processing service, with a warning + # These are fields that we have added to the API since the algorithm was first created fields_to_update = { "task_type": algorithm_config.task_type, "uri": algorithm_config.uri, - "category_map": category_map, } - fields_updated = [] for field in fields_to_update: new_value = fields_to_update[field] if getattr(algo, field) != new_value: logger.warning(f"Field '{field}' changed for algorithm {algo} from {getattr(algo, field)} to {new_value}") setattr(algo, field, new_value) - fields_updated.append(field) - algo.save(update_fields=fields_updated) + algo_fields_updated.append(field) - if not algo.category_map or len(algo.category_map.data) == 0: - # Update existing algorithm that is missing a category map - algo.category_map = category_map - algo.save() - - if _created: - logger.info(f"Registered new algorithm {algo}") - else: - logger.info(f"Assigned algorithm {algo}") + if algo_fields_updated: + algo.save(update_fields=algo_fields_updated) return algo @@ -388,7 +386,7 @@ def get_or_create_algorithm_and_category_map( def get_or_create_detection( source_image: SourceImage, detection_resp: DetectionResponse, - algorithms_used: dict[str, Algorithm], + algorithms_known: dict[str, Algorithm], save: bool = True, logger: logging.Logger = logger, ) -> tuple[Detection, bool]: @@ -396,7 +394,7 @@ def get_or_create_detection( Create a Detection object from a DetectionResponse, or update an existing one. :param detection_resp: A DetectionResponse object - :param algorithms_used: A dictionary of algorithms used in the pipeline, keyed by the algorithm key + :param algorithms_known: A dictionary of algorithms registered in the pipeline, keyed by the algorithm key :param created_objects: A list to store created objects :return: A tuple of the Detection object and a boolean indicating whether it was created @@ -431,12 +429,12 @@ def get_or_create_detection( else: assert detection_resp.algorithm, f"No detection algorithm was specified for detection {detection_repr}" try: - detection_algo = algorithms_used[detection_resp.algorithm.key] + detection_algo = algorithms_known[detection_resp.algorithm.key] except KeyError: raise ValueError( f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. " "The processing service must declare it in the /info endpoint. " - f"Known algorithms: {list(algorithms_used.keys())}" + f"Known algorithms: {list(algorithms_known.keys())}" ) new_detection = Detection( @@ -461,7 +459,7 @@ def get_or_create_detection( def create_detections( detections: list[DetectionResponse], - algorithms_used: dict[str, Algorithm], + algorithms_known: dict[str, Algorithm], logger: logging.Logger = logger, ) -> list[Detection]: """ @@ -469,7 +467,7 @@ def create_detections( Using bulk create. :param detections: A list of DetectionResponse objects - :param algorithms_used: A dictionary of algorithms used in the pipeline, keyed by the algorithm key + :param algorithms_known: A dictionary of algorithms registered in the pipeline, keyed by the algorithm key :param created_objects: A list to store created objects :return: A list of Detection objects @@ -489,7 +487,7 @@ def create_detections( detection, created = get_or_create_detection( source_image=source_image, detection_resp=detection_resp, - algorithms_used=algorithms_used, + algorithms_known=algorithms_known, save=False, logger=logger, ) @@ -581,7 +579,7 @@ def get_or_create_taxon_for_classification( def create_classification( detection: Detection, classification_resp: ClassificationResponse, - algorithms_used: dict[str, Algorithm], + algorithms_known: dict[str, Algorithm], save: bool = True, logger: logging.Logger = logger, ) -> tuple[Classification, bool]: @@ -590,7 +588,7 @@ def create_classification( :param detection: A Detection object :param classification: A ClassificationResponse object - :param algorithms_used: A dictionary of algorithms used in the pipeline, keyed by the algorithm key + :param algorithms_known: A dictionary of algorithms registered in the pipeline, keyed by the algorithm key :param created_objects: A list to store created objects :return: A tuple of the Classification object and a boolean indicating whether it was created @@ -601,12 +599,12 @@ def create_classification( logger.debug(f"Processing classification {classification_resp}") try: - classification_algo = algorithms_used[classification_resp.algorithm.key] + classification_algo = algorithms_known[classification_resp.algorithm.key] except KeyError: raise ValueError( f"Classification algorithm {classification_resp.algorithm.key} is not a known algorithm. " "The processing service must declare it in the /info endpoint. " - f"Known algorithms: {list(algorithms_used.keys())}" + f"Known algorithms: {list(algorithms_known.keys())}" ) if not classification_algo.category_map: @@ -686,7 +684,7 @@ def create_classification( def create_classifications( detections: list[Detection], detection_responses: list[DetectionResponse], - algorithms_used: dict[str, Algorithm], + algorithms_known: dict[str, Algorithm], logger: logging.Logger = logger, save: bool = True, ) -> list[Classification]: @@ -696,7 +694,7 @@ def create_classifications( :param detection: A Detection object :param classifications: A list of ClassificationResponse objects - :param algorithms_used: A dictionary of algorithms used in the pipeline, keyed by the algorithm key + :param algorithms_known: A dictionary of algorithms registered in the pipeline, keyed by the algorithm key :return: A list of Classification objects @@ -710,7 +708,7 @@ def create_classifications( classification, created = create_classification( detection=detection, classification_resp=classification_resp, - algorithms_used=algorithms_used, + algorithms_known=algorithms_known, save=False, logger=logger, ) @@ -826,7 +824,6 @@ def save_results( pipeline, _created = Pipeline.objects.get_or_create(slug=results.pipeline, defaults={"name": results.pipeline}) if _created: logger.warning(f"Pipeline choice returned by the Processing Service was not recognized! {pipeline}") - algorithms_used = set() job_logger = logger start_time = time.time() @@ -852,30 +849,19 @@ def save_results( f"The pipeline returned by the ML backend was not recognized, created a placeholder: {pipeline}" ) - # Algorithms and category maps should be created in advance when registering the pipeline & processing service - # however they are also currently available in each pipeline results response as well. - # @TODO review if we should only use the algorithms from the pre-registered pipeline config instead of the results - algorithms_used = { - algo_key: get_or_create_algorithm_and_category_map(algo_config, logger=job_logger) - for algo_key, algo_config in results.algorithms.items() - } - # Add all algorithms initially reported in the pipeline response to the pipeline - for algo in algorithms_used.values(): - pipeline.algorithms.add(algo) - - algos_reported = [f" {algo.task_type}: {algo_key} ({algo})\n" for algo_key, algo in algorithms_used.items()] - job_logger.info(f"Algorithms reported in pipeline response: \n{''.join(algos_reported)}") + algorithms_known: dict[str, Algorithm] = {algo.key: algo for algo in pipeline.algorithms.all()} + job_logger.info(f"Algorithms registered for pipeline: \n{''.join(algorithms_known.keys())}") detections = create_detections( detections=results.detections, - algorithms_used=algorithms_used, + algorithms_known=algorithms_known, logger=job_logger, ) classifications = create_classifications( detections=detections, detection_responses=results.detections, - algorithms_used=algorithms_used, + algorithms_known=algorithms_known, logger=job_logger, ) @@ -900,24 +886,6 @@ def save_results( event_ids = [img.event_id for img in source_images] # type: ignore update_calculated_fields_for_events(pks=event_ids) - registered_algos = pipeline.algorithms.all() - # Add any algorithms that were reported in a detection response that were not reported in the pipeline response - # This is important for tracking what objects were processed by which algorithms - # to avoid reprocessing, and for tracking provenance. - for algo in algorithms_used.values(): - if algo not in registered_algos: - pipeline.algorithms.add(algo) - job_logger.debug(f"Added algorithm {algo} to pipeline {pipeline}") - - # Warn if the algorithms used in the pipeline response are not the same as the ones registered in the pipeline - if len(algorithms_used) != len(registered_algos): - job_logger.warning( - f"Pipeline {pipeline} has {len(registered_algos)} algorithms registered, but {len(algorithms_used)} " - "algorithms were used in the results response. " - "\n*** Update the registered pipeline to match the algorithms used in the results *** \n" - "otherwise images will always be reprocessed in future runs." - ) - total_time = time.time() - start_time job_logger.info(f"Saved results from pipeline {pipeline} in {total_time:.2f} seconds") @@ -925,6 +893,11 @@ def save_results( """ By default, return None because celery tasks need special handling to return objects. """ + # Collect only algorithms that were actually used in detections or classifications + detection_algos = {det.detection_algorithm for det in detections if det.detection_algorithm} + classification_algos = {clss.algorithm for clss in classifications if clss.algorithm} + algorithms_used: dict[str, Algorithm] = {algo.key: algo for algo in detection_algos | classification_algos} + return PipelineSaveResults( pipeline=pipeline, source_images=source_images, @@ -982,7 +955,7 @@ class Pipeline(BaseModel): description = models.TextField(blank=True) version = models.IntegerField(default=1) version_name = models.CharField(max_length=255, blank=True) - # @TODO the algorithms attribute is not currently used. Review for removal. + # @TODO add support for ordered algorithms in the pipeline, for know the order is only in the stages config algorithms = models.ManyToManyField("ml.Algorithm", related_name="pipelines") stages: list[PipelineStage] = SchemaField( default=default_stages, From 1d087f2abfe7d06a5f49281834ffd9104dc922b8 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 22:50:48 -0700 Subject: [PATCH 04/15] fix: logic for checking existing algorithms, introduce named exception --- ami/ml/exceptions.py | 2 ++ ami/ml/models/algorithm.py | 7 +++++ ami/ml/models/pipeline.py | 54 ++++++++++++++++++++------------------ 3 files changed, 37 insertions(+), 26 deletions(-) create mode 100644 ami/ml/exceptions.py diff --git a/ami/ml/exceptions.py b/ami/ml/exceptions.py new file mode 100644 index 000000000..ef94e1a59 --- /dev/null +++ b/ami/ml/exceptions.py @@ -0,0 +1,2 @@ +class PipelineNotConfigured(ValueError): + pass diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index 961a664f2..bb62c5001 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -227,3 +227,10 @@ def category_count(self) -> int | None: but is defined here for the serializer to work. """ return None + + def has_valid_category_map(self): + return ( + (self.category_map is not None) + and (self.category_map.data is not None) + and (len(self.category_map.data) > 0) + ) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index f3cd60a00..e0a189dbf 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -36,6 +36,7 @@ update_calculated_fields_for_events, update_occurrence_determination, ) +from ami.ml.exceptions import PipelineNotConfigured from ami.ml.models.algorithm import Algorithm, AlgorithmCategoryMap from ami.ml.schemas import ( AlgorithmConfigResponse, @@ -333,36 +334,37 @@ def get_or_create_algorithm_and_category_map( if _created: logger.info(f"Registered new algorithm {algo}") else: - logger.info(f"Using algorithm {algo}") + logger.info(f"Using existing algorithm {algo}") algo_fields_updated = [] new_category_map = None category_map_data = algorithm_config.category_map - if (algo.category_map is None or len(algo.category_map.data) == 0) and category_map_data: - # New algorithms will not have a category map yet, and older ones may not either - # The category map data should be in the algorithm config from the /info endpoint - new_category_map = AlgorithmCategoryMap.objects.create( - version=category_map_data.version, - data=category_map_data.data, - labels=category_map_data.labels, - description=category_map_data.description, - uri=category_map_data.uri, - ) - algo.category_map = new_category_map - algo_fields_updated.append("category_map") - logger.info(f"Registered new category map {new_category_map} for algorithm {algo}") - else: - if algorithm_config.task_type in Algorithm.classification_task_types: - msg = ( - f"No valid category map found for algorithm {algorithm_config.key} with " - f"task type {algorithm_config.task_type} or in the pipeline /info response. " - "Update the processing service to include a category map for all classification algorithms " - "then re-register the pipelines." + if not algo.has_valid_category_map(): + if category_map_data: + # New algorithms will not have a category map yet, and older ones may not either + # The category map data should be in the algorithm config from the /info endpoint + new_category_map = AlgorithmCategoryMap.objects.create( + version=category_map_data.version, + data=category_map_data.data, + labels=category_map_data.labels, + description=category_map_data.description, + uri=category_map_data.uri, ) - raise ValueError(msg) + algo.category_map = new_category_map + algo_fields_updated.append("category_map") + logger.info(f"Registered new category map {new_category_map} for algorithm {algo}") else: - logger.debug(f"No category map found, but not required for task type {algorithm_config.task_type}") + if algorithm_config.task_type in Algorithm.classification_task_types: + msg = ( + f"No valid category map found for algorithm '{algorithm_config.key}' with " + f"task type '{algorithm_config.task_type}' or in the pipeline /info response. " + "Update the processing service to include a category map for all classification algorithms " + "then re-register the pipelines." + ) + raise PipelineNotConfigured(msg) + else: + logger.debug(f"No category map found, but not required for task type {algorithm_config.task_type}") # Update fields that may have changed in the processing service, with a warning # These are fields that we have added to the API since the algorithm was first created @@ -431,7 +433,7 @@ def get_or_create_detection( try: detection_algo = algorithms_known[detection_resp.algorithm.key] except KeyError: - raise ValueError( + raise PipelineNotConfigured( f"Detection algorithm {detection_resp.algorithm.key} is not a known algorithm. " "The processing service must declare it in the /info endpoint. " f"Known algorithms: {list(algorithms_known.keys())}" @@ -601,7 +603,7 @@ def create_classification( try: classification_algo = algorithms_known[classification_resp.algorithm.key] except KeyError: - raise ValueError( + raise PipelineNotConfigured( f"Classification algorithm {classification_resp.algorithm.key} is not a known algorithm. " "The processing service must declare it in the /info endpoint. " f"Known algorithms: {list(algorithms_known.keys())}" @@ -1083,7 +1085,7 @@ def process_images(self, images: typing.Iterable[SourceImage], project_id: int, processing_service = self.choose_processing_service_for_pipeline(job_id, self.name, project_id) if not processing_service.endpoint_url: - raise ValueError( + raise PipelineNotConfigured( f"No endpoint URL configured for this pipeline's processing service ({processing_service})" ) From bfbb750624ff6c70e9998a2e59f32302c2b14564 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 22:51:47 -0700 Subject: [PATCH 05/15] feat: allow passing existing pipeline configs when registering, renames --- ami/ml/models/processing_service.py | 10 ++++++++-- ami/ml/schemas.py | 18 +++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py index 2129f40a5..cf774a460 100644 --- a/ami/ml/models/processing_service.py +++ b/ami/ml/models/processing_service.py @@ -12,7 +12,12 @@ from ami.main.models import Project from ami.ml.models.pipeline import Pipeline, get_or_create_algorithm_and_category_map from ami.ml.models.project_pipeline_config import ProjectPipelineConfig -from ami.ml.schemas import PipelineRegistrationResponse, ProcessingServiceInfoResponse, ProcessingServiceStatusResponse +from ami.ml.schemas import ( + PipelineConfigResponse, + PipelineRegistrationResponse, + ProcessingServiceInfoResponse, + ProcessingServiceStatusResponse, +) logger = logging.getLogger(__name__) @@ -52,11 +57,12 @@ def create_pipelines( self, enable_only: list[str] | None = None, projects: models.QuerySet[Project] | None = None, + pipeline_configs: list[PipelineConfigResponse] | None = None, ) -> PipelineRegistrationResponse: """ Register pipeline choices in Antenna using the pipeline configurations from the processing service API. """ - pipeline_configs = self.get_pipeline_configs() + pipeline_configs = pipeline_configs or self.get_pipeline_configs() pipelines_to_add = pipeline_configs # all of them pipelines = [] diff --git a/ami/ml/schemas.py b/ami/ml/schemas.py index 7f5a5c9a9..3314112c2 100644 --- a/ami/ml/schemas.py +++ b/ami/ml/schemas.py @@ -208,8 +208,16 @@ class PipelineStage(pydantic.BaseModel): description: str | None = None -class PipelineConfig(pydantic.BaseModel): - """A configurable pipeline.""" +class PipelineConfigResponse(pydantic.BaseModel): + """ + Details of a pipeline available in the processing service. + + Includes the algorithm (model) definitions used in the pipeline, and + their category maps (class lists). + + This must be retrieved from the processing service API and saved in Antenna + before images are submitted for processing. + """ name: str slug: str @@ -226,7 +234,7 @@ class ProcessingServiceInfoResponse(pydantic.BaseModel): name: str description: str | None = None - pipelines: list[PipelineConfig] = [] + pipelines: list[PipelineConfigResponse] = [] algorithms: list[AlgorithmConfigResponse] = [] @@ -237,7 +245,7 @@ class ProcessingServiceStatusResponse(pydantic.BaseModel): timestamp: datetime.datetime request_successful: bool - pipeline_configs: list[PipelineConfig] = [] + pipeline_configs: list[PipelineConfigResponse] = [] error: str | None = None server_live: bool | None = None pipelines_online: list[str] = [] @@ -249,6 +257,6 @@ class PipelineRegistrationResponse(pydantic.BaseModel): timestamp: datetime.datetime success: bool error: str | None = None - pipelines: list[PipelineConfig] = [] + pipelines: list[PipelineConfigResponse] = [] pipelines_created: list[str] = [] algorithms_created: list[str] = [] From 0c1c887b9f8ceae36057d58427a5378263d7f197 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 22:53:52 -0700 Subject: [PATCH 06/15] feat: update tests for new behavior (algos & maps must be pre-registered) --- ami/ml/tests.py | 134 +++++++++++++++++---------------------------- docker-compose.yml | 1 + 2 files changed, 52 insertions(+), 83 deletions(-) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 9f133a953..82567fcb9 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -5,7 +5,7 @@ from rest_framework.test import APIRequestFactory, APITestCase from ami.base.serializers import reverse_with_params -from ami.main.models import Classification, Detection, Project, SourceImage, SourceImageCollection +from ami.main.models import Detection, Project, SourceImage, SourceImageCollection from ami.ml.models import Algorithm, Pipeline, ProcessingService from ami.ml.models.pipeline import collect_images, get_or_create_algorithm_and_category_map, save_results from ami.ml.schemas import ( @@ -156,6 +156,39 @@ def test_created_category_maps(self): assert detection.occurrence assert detection.occurrence.determination in classification_taxa + def test_missing_category_map(self): + # Test that an exception is raised if a classification algorithm is missing a category map + from ami.ml.exceptions import PipelineNotConfigured + + # Get the response from the /info endpoint + pipeline_configs = self.processing_service.get_pipeline_configs() + + # Assert that there is a least one classification algorithm with a category map + self.assertTrue( + any( + algo.task_type in Algorithm.classification_task_types and algo.category_map is not None + for pipeline in pipeline_configs + for algo in pipeline.algorithms + ), + "Expected pipeline to have at least one classification algorithm with a category map", + ) + + # Remove the category map from one of the classification algorithms + for pipeline_config in pipeline_configs: + for algorithm in pipeline_config.algorithms: + if algorithm.task_type in Algorithm.classification_task_types and algorithm.category_map is not None: + algorithm.category_map = None + # Change the key to ensure it's treated as a new algorithm + algorithm.key = "missing-category-map-classifier" + algorithm.name = "Classifier with Missing Category Map" + break + + with self.assertRaises( + PipelineNotConfigured, + msg="Expected an exception to be raised if a classification algorithm is missing a category map", + ): + self.processing_service.create_pipelines(pipeline_configs=pipeline_configs) + def test_alignment_of_predictions_and_category_map(self): # Ensure that the scores and labels are aligned pipeline = self.processing_service_instance.pipelines.all().get(slug="random-detection-random-species") @@ -481,6 +514,13 @@ def test_skip_existing_per_batch_during_processing(self): pass def test_unknown_algorithm_returned_by_processing_service(self): + """ + Test that unknown algorithms returned by the processing service are handled correctly. + + Previously we allowed unknown algorithms to be returned by the pipeline, + now all algorithms must be registered first from the processing service's /info + endpoint. + """ fake_results = self.fake_pipeline_results(self.test_images, self.pipeline) new_detector = AlgorithmConfigResponse( @@ -501,92 +541,20 @@ def test_unknown_algorithm_returned_by_processing_service(self): current_total_algorithm_count = Algorithm.objects.count() - # @TODO assert a warning was logged - save_results(fake_results) + # Ensure an exception is raised that a new algorithm was not + # pre-registered from the /info endpoint + from ami.ml.exceptions import PipelineNotConfigured - # Ensure new algorithms were added to the database + with self.assertRaises(PipelineNotConfigured): + save_results(fake_results) + + # Ensure no new algorithms were added to the database new_algorithm_count = Algorithm.objects.count() - self.assertEqual(new_algorithm_count, current_total_algorithm_count + 2) + self.assertEqual(new_algorithm_count, current_total_algorithm_count) # Ensure new algorithms were also added to the pipeline - self.assertTrue(self.pipeline.algorithms.filter(name=new_detector.name, key=new_detector.key).exists()) - self.assertTrue(self.pipeline.algorithms.filter(name=new_classifier.name, key=new_classifier.key).exists()) - - @unittest.skip("Not implemented yet") - def test_reprocessing_after_unknown_algorithm_added(self): - # @TODO fix issue with "None" algorithm on some detections - - images = list(collect_images(collection=self.image_collection, pipeline=self.pipeline)) - - save_results(self.fake_pipeline_results(images, self.pipeline)) - - new_detector = AlgorithmConfigResponse( - name="Unknown Detector 5.1b-mobile", key="unknown-detector", task_type="detection" - ) - new_classifier = AlgorithmConfigResponse( - name="Unknown Classifier 3.0b-mega", key="unknown-classifier", task_type="classification" - ) - - fake_results = self.fake_pipeline_results(images, self.pipeline) - - # Change the algorithm names to unknown ones - for detection in fake_results.detections: - detection.algorithm = AlgorithmReference(name=new_detector.name, key=new_detector.key) - - for classification in detection.classifications: - classification.algorithm = AlgorithmReference(name=new_classifier.name, key=new_classifier.key) - - fake_results.algorithms[new_detector.key] = new_detector - fake_results.algorithms[new_classifier.key] = new_classifier - - # print("FAKE RESULTS") - # print(fake_results) - # print("END FAKE RESULTS") - - saved_objects = save_results(fake_results, return_created=True) - assert saved_objects is not None - saved_detections = saved_objects.detections - saved_classifications = saved_objects.classifications - - for obj in saved_detections: - assert obj.detection_algorithm # For type checker, not the test - - # Ensure the new detector was used for the detection - self.assertEqual(obj.detection_algorithm.name, new_detector.name) - - # Ensure each detection has classification objects - self.assertTrue(obj.classifications.exists()) - - # Ensure detection has a correct classification object - for classification in obj.classifications.all(): - self.assertIn(classification, saved_classifications) - - for obj in saved_classifications: - assert obj.algorithm # For type checker, not the test - - # Ensure the new classifier was used for the classification - self.assertEqual(obj.algorithm.name, new_classifier.name) - - # Ensure each classification has the correct detection object - self.assertIn(obj.detection, saved_detections, "Wrong detection object for classification object.") - - # Ensure new algorithms were added to the pipeline - self.assertTrue(self.pipeline.algorithms.filter(name=new_detector.name).exists()) - self.assertTrue(self.pipeline.algorithms.filter(name=new_classifier.name).exists()) - - detection_algos_used = Detection.objects.all().values_list("detection_algorithm__name", flat=True).distinct() - self.assertTrue(new_detector.name in detection_algos_used) - # Ensure None is not in the list - self.assertFalse(None in detection_algos_used) - classification_algos_used = Classification.objects.all().values_list("algorithm__name", flat=True) - self.assertTrue(new_classifier.name in classification_algos_used) - # Ensure None is not in the list - self.assertFalse(None in classification_algos_used) - - # The algorithms are new, but they were registered to the pipeline, so the images should be skipped. - images_again = list(collect_images(collection=self.image_collection, pipeline=self.pipeline)) - remaining_images_to_process = len(images_again) - self.assertEqual(remaining_images_to_process, 0) + # self.assertTrue(self.pipeline.algorithms.filter(name=new_detector.name, key=new_detector.key).exists()) + # self.assertTrue(self.pipeline.algorithms.filter(name=new_classifier.name, key=new_classifier.key).exists()) def test_yes_reprocess_if_new_terminal_algorithm_same_intermediate(self): """ diff --git a/docker-compose.yml b/docker-compose.yml index e1588478d..ff9d125f0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,6 +23,7 @@ services: - postgres - redis - minio-init + - ml_backend volumes: - .:/app:z env_file: From 8e2659a5e86a09333f8949368b9bc7cef6b62619 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 23:41:02 -0700 Subject: [PATCH 07/15] feat: deprecate the inclusion of algorithm details in result responses --- ami/ml/models/pipeline.py | 7 +++++++ ami/ml/schemas.py | 8 ++++++-- processing_services/example/api/pipelines.py | 2 +- processing_services/example/api/schemas.py | 10 +++++++--- processing_services/minimal/api/api.py | 2 +- processing_services/minimal/api/schemas.py | 10 +++++++--- 6 files changed, 29 insertions(+), 10 deletions(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index e0a189dbf..dc849ff0f 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -854,6 +854,13 @@ def save_results( algorithms_known: dict[str, Algorithm] = {algo.key: algo for algo in pipeline.algorithms.all()} job_logger.info(f"Algorithms registered for pipeline: \n{''.join(algorithms_known.keys())}") + if results.algorithms: + logger.warning( + "Algorithms were returned by the processing service in the results, these will be ignored and " + "they should be removed to increase performance. " + "Algorithms and category maps must be registered before processing, using /info endpoint." + ) + detections = create_detections( detections=results.detections, algorithms_known=algorithms_known, diff --git a/ami/ml/schemas.py b/ami/ml/schemas.py index 3314112c2..c555ef943 100644 --- a/ami/ml/schemas.py +++ b/ami/ml/schemas.py @@ -182,8 +182,12 @@ class PipelineResultsResponse(pydantic.BaseModel): pipeline: str algorithms: dict[str, AlgorithmConfigResponse] = pydantic.Field( default_factory=dict, - description="A dictionary of all algorithms used in the pipeline, including their class list and other " - "metadata, keyed by the algorithm key.", + description=( + "A dictionary of all algorithms used in the pipeline, including their class list and other " + "metadata, keyed by the algorithm key. " + "DEPRECATED: Algorithms should only be provided in the ProcessingServiceInfoResponse." + ), + depreciated=True, ) total_time: float source_images: list[SourceImageResponse] diff --git a/processing_services/example/api/pipelines.py b/processing_services/example/api/pipelines.py index 44f4a0c21..70860f611 100644 --- a/processing_services/example/api/pipelines.py +++ b/processing_services/example/api/pipelines.py @@ -133,7 +133,7 @@ def _get_pipeline_response(self, detections: list[Detection], elapsed_time: floa return PipelineResultsResponse( pipeline=self.config.slug, # type: ignore - algorithms={algorithm.key: algorithm for algorithm in self.config.algorithms}, + # algorithms={algorithm.key: algorithm for algorithm in self.config.algorithms}, total_time=elapsed_time, source_images=source_image_responses, detections=detection_responses, diff --git a/processing_services/example/api/schemas.py b/processing_services/example/api/schemas.py index af7ccb8ac..05682c6f1 100644 --- a/processing_services/example/api/schemas.py +++ b/processing_services/example/api/schemas.py @@ -264,12 +264,16 @@ class Config: class PipelineResultsResponse(pydantic.BaseModel): pipeline: PipelineChoice + total_time: float algorithms: dict[str, AlgorithmConfigResponse] = pydantic.Field( default_factory=dict, - description="A dictionary of all algorithms used in the pipeline, including their class list and other " - "metadata, keyed by the algorithm key.", + description=( + "A dictionary of all algorithms used in the pipeline, including their class list and other " + "metadata, keyed by the algorithm key. " + "DEPRECATED: Algorithms should only be provided in the ProcessingServiceInfoResponse." + ), + depreciated=True, ) - total_time: float source_images: list[SourceImageResponse] detections: list[DetectionResponse] errors: list | str | None = None diff --git a/processing_services/minimal/api/api.py b/processing_services/minimal/api/api.py index 9aa2d6b28..400156894 100644 --- a/processing_services/minimal/api/api.py +++ b/processing_services/minimal/api/api.py @@ -116,7 +116,7 @@ async def process(data: PipelineRequest) -> PipelineResultsResponse: response = PipelineResultsResponse( pipeline=pipeline_slug, - algorithms={algorithm.key: algorithm for algorithm in pipeline.config.algorithms}, + # algorithms={algorithm.key: algorithm for algorithm in pipeline.config.algorithms}, source_images=source_image_results, detections=results, total_time=seconds_elapsed, diff --git a/processing_services/minimal/api/schemas.py b/processing_services/minimal/api/schemas.py index ffdc29a07..b0febba1b 100644 --- a/processing_services/minimal/api/schemas.py +++ b/processing_services/minimal/api/schemas.py @@ -229,12 +229,16 @@ class Config: class PipelineResultsResponse(pydantic.BaseModel): pipeline: PipelineChoice + total_time: float algorithms: dict[str, AlgorithmConfigResponse] = pydantic.Field( default_factory=dict, - description="A dictionary of all algorithms used in the pipeline, including their class list and other " - "metadata, keyed by the algorithm key.", + description=( + "A dictionary of all algorithms used in the pipeline, including their class list and other " + "metadata, keyed by the algorithm key. " + "DEPRECATED: Algorithms should only be provided in the ProcessingServiceInfoResponse." + ), + depreciated=True, ) - total_time: float source_images: list[SourceImageResponse] detections: list[DetectionResponse] From f906d54b4b8dd3cc9d974170668a95fa7ccf8cac Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 23:42:14 -0700 Subject: [PATCH 08/15] feat: migration for merging duplicate category maps --- .../0023_merge_duplicate_category_maps.py | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 ami/ml/migrations/0023_merge_duplicate_category_maps.py diff --git a/ami/ml/migrations/0023_merge_duplicate_category_maps.py b/ami/ml/migrations/0023_merge_duplicate_category_maps.py new file mode 100644 index 000000000..82a1fb851 --- /dev/null +++ b/ami/ml/migrations/0023_merge_duplicate_category_maps.py @@ -0,0 +1,147 @@ +# Generated on 2025-09-09 for merging duplicate AlgorithmCategoryMaps + +from django.db import migrations +from django.db.models import Count +import logging +import json + + +logger = logging.getLogger(__name__) + + +def merge_duplicate_category_maps(apps, schema_editor): + """ + Find duplicate AlgorithmCategoryMaps based on their `data` field, + compare them (description, version, associated Algorithms, associated Classifications), + choose a keeper, then reassign Classifications and Algorithms to use the keeper. + Then delete the duplicates. + """ + AlgorithmCategoryMap = apps.get_model("ml", "AlgorithmCategoryMap") + Algorithm = apps.get_model("ml", "Algorithm") + Classification = apps.get_model("main", "Classification") + + # Group category maps by their data content (JSON field) + # We'll use a dictionary to group by serialized data + data_groups = {} + + for category_map in AlgorithmCategoryMap.objects.all(): + # Normalize the data for comparison by converting to a sorted JSON string + normalized_data = json.dumps(category_map.data, sort_keys=True) + + if normalized_data not in data_groups: + data_groups[normalized_data] = [] + data_groups[normalized_data].append(category_map) + + # Process each group that has duplicates + duplicates_found = 0 + maps_merged = 0 + + for normalized_data, category_maps in data_groups.items(): + if len(category_maps) <= 1: + continue # Skip groups with only one category map + + duplicates_found += len(category_maps) - 1 + logger.info(f"Found {len(category_maps)} duplicate category maps with data hash") + + # Choose the keeper - prioritize by: + # 1. Has description + # 2. Has version + # 3. Most associated algorithms + # 4. Most associated classifications + # 5. Earliest created (as tie-breaker) + + def score_category_map(cm): + score = 0 + + # Has description + if cm.description: + score += 1000 + + # Has version + if cm.version: + score += 500 + + # Count associated algorithms + algorithm_count = Algorithm.objects.filter(category_map=cm).count() + score += algorithm_count * 100 + + # Count associated classifications + classification_count = Classification.objects.filter(category_map=cm).count() + score += classification_count * 10 + + # Prefer older records (negative timestamp for sorting) + score -= cm.created_at.timestamp() / 1000000 # Small adjustment for tie-breaking + + return score + + # Sort by score (highest first) and pick the keeper + sorted_maps = sorted(category_maps, key=score_category_map, reverse=True) + keeper = sorted_maps[0] + duplicates = sorted_maps[1:] + + logger.info(f"Keeping category map #{keeper.pk}, merging {len(duplicates)} duplicates") + + # Merge data from duplicates to keeper + for duplicate in duplicates: + # Update algorithms pointing to the duplicate + algorithms_updated = Algorithm.objects.filter(category_map=duplicate).update(category_map=keeper) + logger.info(f"Updated {algorithms_updated} algorithms from category map #{duplicate.pk} to #{keeper.pk}") + + # Update classifications pointing to the duplicate + classifications_updated = Classification.objects.filter(category_map=duplicate).update(category_map=keeper) + logger.info( + f"Updated {classifications_updated} classifications from category map #{duplicate.pk} to #{keeper.pk}" + ) + + # If duplicate has better description or version, update keeper + if not keeper.description and duplicate.description: + keeper.description = duplicate.description + logger.info(f"Updated keeper description from duplicate #{duplicate.pk}") + + if not keeper.version and duplicate.version: + keeper.version = duplicate.version + logger.info(f"Updated keeper version from duplicate #{duplicate.pk}") + + if not keeper.uri and duplicate.uri: + keeper.uri = duplicate.uri + logger.info(f"Updated keeper URI from duplicate #{duplicate.pk}") + + # Save keeper with any merged data + keeper.save() + + # Delete the duplicates + for duplicate in duplicates: + logger.info(f"Deleting duplicate category map #{duplicate.pk}") + duplicate.delete() + + maps_merged += len(duplicates) + + logger.info( + f"Migration completed: {duplicates_found} duplicates found, {maps_merged} category maps merged and deleted" + ) + + +def reverse_merge_duplicate_category_maps(apps, schema_editor): + """ + This migration cannot be easily reversed since we deleted duplicate data. + The reverse operation would require restoring the deleted category maps + and reassigning relationships, which is not feasible without backup data. + """ + raise NotImplementedError( + "This migration cannot be reversed as it permanently deletes duplicate " + "AlgorithmCategoryMap instances. If you need to reverse this, restore from a backup." + ) + + +class Migration(migrations.Migration): + dependencies = [ + ("ml", "0022_alter_pipeline_default_config"), + ("main", "0053_alter_classification_algorithm"), # Ensure Classification model is available + ] + + operations = [ + migrations.RunPython( + merge_duplicate_category_maps, + reverse_merge_duplicate_category_maps, + ), + ] From 9cedebb7a0362f8008e821a03632ae2317d79471 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 9 Sep 2025 23:46:40 -0700 Subject: [PATCH 09/15] feat: update category map relationships in old classifications --- ...x_classifications_missing_category_maps.py | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 ami/ml/migrations/0024_fix_classifications_missing_category_maps.py diff --git a/ami/ml/migrations/0024_fix_classifications_missing_category_maps.py b/ami/ml/migrations/0024_fix_classifications_missing_category_maps.py new file mode 100644 index 000000000..9c7c11e91 --- /dev/null +++ b/ami/ml/migrations/0024_fix_classifications_missing_category_maps.py @@ -0,0 +1,80 @@ +# Generated on 2025-09-09 for fixing unlinked Classifications + +from django.db import migrations +import logging + + +logger = logging.getLogger(__name__) + + +def fix_unlinked_classifications(apps, schema_editor): + """ + Fix classifications that are missing category_map references but have + algorithms with category_maps. This typically happens with legacy data + created before the automatic category_map assignment was implemented. + """ + Classification = apps.get_model("main", "Classification") + + # Find classifications that need fixing + unlinked_classifications = Classification.objects.filter( + category_map__isnull=True, algorithm__category_map__isnull=False + ) + + total_unlinked = unlinked_classifications.count() + logger.info(f"Found {total_unlinked:,} classifications missing category_map but with algorithm that has one") + + if total_unlinked == 0: + logger.info("No unlinked classifications found - migration complete") + return + + # Group by algorithm to do bulk updates more efficiently + algorithms_with_unlinked = unlinked_classifications.values_list("algorithm_id", flat=True).distinct() + + total_fixed = 0 + + for algorithm_id in algorithms_with_unlinked: + # Get the algorithm's category_map + algorithm_classifications = unlinked_classifications.filter(algorithm_id=algorithm_id) + first_classification = algorithm_classifications.first() + + if not first_classification or not first_classification.algorithm: + continue + + category_map = first_classification.algorithm.category_map + if not category_map: + continue + + # Bulk update all classifications for this algorithm + updated_count = algorithm_classifications.update(category_map=category_map) + total_fixed += updated_count + + logger.info( + f"Updated {updated_count:,} classifications for algorithm #{algorithm_id} to use category_map #{category_map.pk}" + ) + + logger.info(f"Migration completed: Fixed {total_fixed:,} unlinked classifications") + + +def reverse_fix_unlinked_classifications(apps, schema_editor): + """ + This migration fixes data consistency issues and should not be reversed. + However, if needed, this would set category_map back to null for classifications + that were updated by this migration. + """ + logger.warning("Reversing this migration would create data inconsistency - not recommended") + # We could implement a reversal if absolutely necessary, but it's not recommended + # since this migration fixes legitimate data consistency issues + + +class Migration(migrations.Migration): + dependencies = [ + ("ml", "0023_merge_duplicate_category_maps"), + ("main", "0053_alter_classification_algorithm"), # Ensure Classification model is available + ] + + operations = [ + migrations.RunPython( + fix_unlinked_classifications, + reverse_fix_unlinked_classifications, + ), + ] From 42a9ebf31dd8168ddcfe8e806c0d0c1a49a4b525 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 16 Sep 2025 18:43:06 -0700 Subject: [PATCH 10/15] feat: ensure labels hash is always generated with save method. --- ami/ml/models/algorithm.py | 14 -------------- ami/ml/tests.py | 27 +++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index bb62c5001..0b65ec4ef 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -16,18 +16,6 @@ from ami.base.models import BaseModel -@typing.final -class AlgorithmCategoryMapManager(models.Manager["AlgorithmCategoryMap"]): - def create(self, *args, **kwargs): - """ - Create a new AlgorithmCategoryMap instance and generate its labels_hash. - """ - instance = super().create(*args, **kwargs) - instance.labels_hash = instance.make_labels_hash(instance.labels) - instance.save() - return instance - - @typing.final class AlgorithmCategoryMap(BaseModel): """ @@ -57,8 +45,6 @@ class AlgorithmCategoryMap(BaseModel): algorithms: models.QuerySet[Algorithm] - objects = AlgorithmCategoryMapManager() - def __str__(self): return f"#{self.pk} with {len(self.labels)} classes ({self.version or 'unknown version'})" diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 82567fcb9..6fc8d8078 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -669,3 +669,30 @@ def test_algorithm_category_maps(self): # Ensure the full labels in the data match the simple, ordered list of labels sorted_data = sorted(algorithm.category_map.data, key=lambda x: x["index"]) assert [category["label"] for category in sorted_data] == algorithm.category_map.labels + + def test_labels_hash_auto_generation(self): + """Test that labels_hash is automatically generated when creating AlgorithmCategoryMap instances.""" + from ami.ml.models import AlgorithmCategoryMap + + # Test data + test_labels = ["coleoptera", "diptera", "lepidoptera"] + test_data = [ + {"index": 0, "label": "coleoptera"}, + {"index": 1, "label": "diptera"}, + {"index": 2, "label": "lepidoptera"}, + ] + + # Create instance using objects.create() + category_map = AlgorithmCategoryMap.objects.create(labels=test_labels, data=test_data, version="test-v1") + + # Verify labels_hash was automatically generated + self.assertIsNotNone(category_map.labels_hash) + + # Verify the hash matches what make_labels_hash would produce + expected_hash = AlgorithmCategoryMap.make_labels_hash(test_labels) + self.assertEqual(category_map.labels_hash, expected_hash) + + # Test that creating another instance with same labels produces same hash + category_map2 = AlgorithmCategoryMap.objects.create(labels=test_labels, data=test_data, version="test-v2") + + self.assertEqual(category_map.labels_hash, category_map2.labels_hash) From 41ef9e1f45900ad519a75eec036d75d69068fe48 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 16 Sep 2025 19:02:41 -0700 Subject: [PATCH 11/15] feat: docs and convenience methods to explain category map schema --- ami/ml/models/algorithm.py | 43 ++++++++++++++++++++++++++++++++++++++ ami/ml/tests.py | 21 ++++++++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index 0b65ec4ef..73adc4648 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -20,6 +20,41 @@ class AlgorithmCategoryMap(BaseModel): """ A list of classification labels for a given algorithm version + + Expected schema for `data` field. This is the primary "category map" used by the model + to map from the category index in the model output to a human-readable label and other metadata. + + IMPORTANT: Currently only `label` & `taxon_rank` are imported to the Taxon model if the taxon does + not already exist in the Antenna database. But the Taxon model can store any metadata, so this is + extensible in the future. + [ + { + "index": 0, + "gbif_key": 123456, + "label": "Vanessa atalanta", + "taxon_rank": "SPECIES", + }, + { + "index": 1, + "gbif_key": 789012, + "label": "Limenitis", + "taxon_rank": "GENUS", + }, + { + "id": 3, + "gbif_key": 345678, + "label": "Nymphalis californica", + "taxon_rank": "SPECIES", + } + ] + + The labels field is a simple list of string labels the correct index order used by the model. + [ + "Vanessa atalanta", + "Limenitis", + "Nymphalis californica", + ] + """ data = models.JSONField( @@ -55,6 +90,14 @@ def make_labels_hash(cls, labels): """ return hash("".join(labels)) + @classmethod + def labels_from_data(cls, data, label_field="label"): + return [category[label_field] for category in data] + + @classmethod + def data_from_labels(cls, labels, label_field="label"): + return [{"index": i, label_field: label} for i, label in enumerate(labels)] + def get_category(self, label, label_field="label"): # Can use JSON containment operators return self.data.index(next(category for category in self.data if category[label_field] == label)) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 6fc8d8078..d91fdf4c2 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -675,12 +675,12 @@ def test_labels_hash_auto_generation(self): from ami.ml.models import AlgorithmCategoryMap # Test data - test_labels = ["coleoptera", "diptera", "lepidoptera"] test_data = [ {"index": 0, "label": "coleoptera"}, {"index": 1, "label": "diptera"}, {"index": 2, "label": "lepidoptera"}, ] + test_labels = AlgorithmCategoryMap.labels_from_data(test_data) # Create instance using objects.create() category_map = AlgorithmCategoryMap.objects.create(labels=test_labels, data=test_data, version="test-v1") @@ -696,3 +696,22 @@ def test_labels_hash_auto_generation(self): category_map2 = AlgorithmCategoryMap.objects.create(labels=test_labels, data=test_data, version="test-v2") self.assertEqual(category_map.labels_hash, category_map2.labels_hash) + + def test_labels_data_conversion_methods(self): + from ami.ml.models import AlgorithmCategoryMap + + # Test data + test_data = [ + {"index": 0, "label": "coleoptera"}, + {"index": 1, "label": "diptera"}, + {"index": 2, "label": "lepidoptera"}, + ] + test_labels = AlgorithmCategoryMap.labels_from_data(test_data) + + # Convert labels to data and back + converted_data = AlgorithmCategoryMap.data_from_labels(test_labels) + converted_labels = AlgorithmCategoryMap.labels_from_data(converted_data) + + # Verify conversions are correct + self.assertEqual(test_data, converted_data) + self.assertEqual(test_labels, converted_labels) From 2ebaefe93d212704aeed8cc144e83c426b108253 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 16 Sep 2025 21:18:02 -0700 Subject: [PATCH 12/15] fix: return category maps from all models in example processing service --- processing_services/example/api/algorithms.py | 127 +++++++++++++----- processing_services/example/api/pipelines.py | 20 +-- 2 files changed, 106 insertions(+), 41 deletions(-) diff --git a/processing_services/example/api/algorithms.py b/processing_services/example/api/algorithms.py index 3f9b5ace9..8a80038dd 100644 --- a/processing_services/example/api/algorithms.py +++ b/processing_services/example/api/algorithms.py @@ -42,15 +42,28 @@ def compile(self): def run(self, inputs: list[SourceImage] | list[Detection]) -> list[Detection]: raise NotImplementedError("Subclasses must implement the run method") - algorithm_config_response = AlgorithmConfigResponse( - name="Base Algorithm", - key="base", - task_type="base", - description="A base class for all algorithms.", - version=1, - version_name="v1", - category_map=None, - ) + def get_category_map(self) -> AlgorithmCategoryMapResponse: + return AlgorithmCategoryMapResponse( + data=[], + labels=[], + version="v1", + description="A model without labels.", + uri=None, + ) + + def get_algorithm_config_response(self) -> AlgorithmConfigResponse: + return AlgorithmConfigResponse( + name="Base Algorithm", + key="base", + task_type="base", + description="A base class for all algorithms.", + version=1, + version_name="v1", + category_map=self.get_category_map(), + ) + + def __init__(self): + self.algorithm_config_response = self.get_algorithm_config_response() class ZeroShotObjectDetector(Algorithm): @@ -141,18 +154,28 @@ def run(self, source_images: list[SourceImage], intermediate=False) -> list[Dete return detector_responses - algorithm_config_response = AlgorithmConfigResponse( - name="Zero Shot Object Detector", - key="zero-shot-object-detector", - task_type="detection", - description=( - "Huggingface Zero Shot Object Detection model." - "Produces both a bounding box and a candidate label classification for each detection." - ), - version=1, - version_name="v1", - category_map=None, - ) + def get_category_map(self) -> AlgorithmCategoryMapResponse: + return AlgorithmCategoryMapResponse( + data=[{"index": i, "label": label} for i, label in enumerate(self.candidate_labels)], + labels=self.candidate_labels, + version="v1", + description="Candidate labels used for zero-shot object detection.", + uri=None, + ) + + def get_algorithm_config_response(self) -> AlgorithmConfigResponse: + return AlgorithmConfigResponse( + name="Zero Shot Object Detector", + key="zero-shot-object-detector", + task_type="detection", + description=( + "Huggingface Zero Shot Object Detection model." + "Produces both a bounding box and a candidate label classification for each detection." + ), + version=1, + version_name="v1", + category_map=self.get_category_map(), + ) class HFImageClassifier(Algorithm): @@ -160,6 +183,8 @@ class HFImageClassifier(Algorithm): A local classifier that uses the Hugging Face pipeline to classify images. """ + model_name: str = "google/vit-base-patch16-224" # Vision Transformer model trained on ImageNet-1k + def compile(self): saved_models_key = "hf_image_classifier" # generate a key for each uniquely compiled algorithm @@ -167,7 +192,7 @@ def compile(self): from transformers import pipeline logger.info(f"Compiling {self.algorithm_config_response.name} from scratch...") - self.model = pipeline("image-classification", model="google/vit-base-patch16-224") + self.model = pipeline("image-classification", model=self.model_name, device=get_best_device()) SAVED_MODELS[saved_models_key] = self.model else: logger.info(f"Using saved model for {self.algorithm_config_response.name}...") @@ -216,15 +241,55 @@ def run(self, detections: list[Detection]) -> list[Detection]: return detections_to_return - algorithm_config_response = AlgorithmConfigResponse( - name="HF Image Classifier", - key="hf-image-classifier", - task_type="classification", - description="HF ViT for image classification.", - version=1, - version_name="v1", - category_map=None, - ) + def get_category_map(self) -> AlgorithmCategoryMapResponse: + """ + Extract the category map from the model. + Returns an AlgorithmCategoryMapResponse with labels, data, and model information. + """ + from transformers.models.auto.configuration_auto import AutoConfig + + logger.info(f"Loading configuration for {self.model_name}") + config = AutoConfig.from_pretrained(self.model_name) + + # Extract label information + if not hasattr(config, "id2label") or not config.id2label: + raise ValueError( + f"Cannot create category map for model {self.model_name}, no id2label mapping found in config" + ) + else: + # Sort labels by index + # Ensure keys are strings for consistent access + id2label: dict[str, str] = {str(k): v for k, v in config.id2label.items()} + indices = sorted([int(k) for k in id2label.keys()]) + + # Create labels and data + labels = [id2label[str(i)] for i in indices] + data = [{"label": label, "index": idx} for idx, label in zip(indices, labels)] + + # Build description + description_text = ( + f"Vision Transformer model trained on ImageNet-1k. " + f"Contains {len(labels)} object classes. Model: {self.model_name}" + ) + + return AlgorithmCategoryMapResponse( + data=data, + labels=labels, + version="ImageNet-1k", + description=description_text, + uri=f"https://huggingface.co/{self.model_name}", + ) + + def get_algorithm_config_response(self) -> AlgorithmConfigResponse: + return AlgorithmConfigResponse( + name="HF Image Classifier", + key="hf-image-classifier", + task_type="classification", + description="HF ViT for image classification.", + version=1, + version_name="v1", + category_map=self.get_category_map(), + ) class RandomSpeciesClassifier(Algorithm): diff --git a/processing_services/example/api/pipelines.py b/processing_services/example/api/pipelines.py index 70860f611..02b31d0d9 100644 --- a/processing_services/example/api/pipelines.py +++ b/processing_services/example/api/pipelines.py @@ -153,8 +153,8 @@ class ZeroShotHFClassifierPipeline(Pipeline): description=("Zero Shot Object Detector with HF image classifier."), version=1, algorithms=[ - ZeroShotObjectDetector.algorithm_config_response, - HFImageClassifier.algorithm_config_response, + ZeroShotObjectDetector().algorithm_config_response, + HFImageClassifier().algorithm_config_response, ], ) @@ -167,7 +167,7 @@ def get_stages(self) -> list[Algorithm]: zero_shot_object_detector.candidate_labels = self.request_config["candidate_labels"] self.config.algorithms = [ zero_shot_object_detector.algorithm_config_response, - HFImageClassifier.algorithm_config_response, + HFImageClassifier().algorithm_config_response, ] return [zero_shot_object_detector, HFImageClassifier()] @@ -212,7 +212,7 @@ class ZeroShotObjectDetectorPipeline(Pipeline): slug="zero-shot-object-detector-pipeline", description=("Zero shot object detector (bbox and classification)."), version=1, - algorithms=[ZeroShotObjectDetector.algorithm_config_response], + algorithms=[ZeroShotObjectDetector().algorithm_config_response], ) def get_stages(self) -> list[Algorithm]: @@ -254,8 +254,8 @@ class ZeroShotObjectDetectorWithRandomSpeciesClassifierPipeline(Pipeline): description=("HF zero shot object detector with random species classifier."), version=1, algorithms=[ - ZeroShotObjectDetector.algorithm_config_response, - RandomSpeciesClassifier.algorithm_config_response, + ZeroShotObjectDetector().algorithm_config_response, + RandomSpeciesClassifier().algorithm_config_response, ], ) @@ -266,7 +266,7 @@ def get_stages(self) -> list[Algorithm]: self.config.algorithms = [ zero_shot_object_detector.algorithm_config_response, - RandomSpeciesClassifier.algorithm_config_response, + RandomSpeciesClassifier().algorithm_config_response, ] return [zero_shot_object_detector, RandomSpeciesClassifier()] @@ -307,8 +307,8 @@ class ZeroShotObjectDetectorWithConstantClassifierPipeline(Pipeline): description=("HF zero shot object detector with constant classifier."), version=1, algorithms=[ - ZeroShotObjectDetector.algorithm_config_response, - ConstantClassifier.algorithm_config_response, + ZeroShotObjectDetector().algorithm_config_response, + ConstantClassifier().algorithm_config_response, ], ) @@ -319,7 +319,7 @@ def get_stages(self) -> list[Algorithm]: self.config.algorithms = [ zero_shot_object_detector.algorithm_config_response, - ConstantClassifier.algorithm_config_response, + ConstantClassifier().algorithm_config_response, ] return [zero_shot_object_detector, ConstantClassifier()] From d06a14a548fc8221dbb09a7d142d637ca1f485fa Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 16 Sep 2025 21:32:17 -0700 Subject: [PATCH 13/15] fix: slightly improve the top_n taxa response --- ami/main/models.py | 5 +++-- ami/ml/models/algorithm.py | 14 +++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/ami/main/models.py b/ami/main/models.py index 88da476ae..fd632a44d 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -2220,14 +2220,15 @@ def top_n(self, n: int = 3) -> list[dict[str, "Taxon | float | None"]]: """Return top N taxa and scores for this classification.""" if not self.category_map: logger.warning( - f"Classification {self.pk}'s algrorithm ({self.algorithm_id} has no catgory map, " + f"Classification {self.pk}'s algorithm ({self.algorithm_id}) has no category map, " "can't get top N predictions." ) return [] top_scored = self.top_scores_with_index(n) # (index, score) pairs indexes = [idx for idx, _ in top_scored] - category_data = self.category_map.with_taxa(only_indexes=indexes) + category_data: list[dict] = self.category_map.with_taxa(only_indexes=indexes) + assert category_data is not None index_to_taxon = {cat["index"]: cat["taxon"] for cat in category_data} return [ diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index 73adc4648..2379e1e9e 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -102,26 +102,30 @@ def get_category(self, label, label_field="label"): # Can use JSON containment operators return self.data.index(next(category for category in self.data if category[label_field] == label)) - def with_taxa(self, category_field="label", only_indexes: list[int] | None = None): + def with_taxa(self, category_field="label", only_indexes: list[int] | None = None) -> list[dict]: """ Add Taxon objects to the category map, or None if no match :param category_field: The field in the category data to match against the Taxon name :return: The category map with the taxon objects added - @TODO need a top_n parameter to limit the number of taxa to fetch - @TODO consider creating missing taxa? + @TODO consider creating missing taxa in batch? the top 1 taxon is saved when a classification is created, but + not the rest of the taxa in the category map, so the top_n response will often have missing taxa. + @TODO this needs refactoring and optimization """ from ami.main.models import Taxon if only_indexes: - labels_data = [self.data[i] for i in only_indexes] + labels_data: list[dict] = [category for category in self.data if category["index"] in only_indexes] labels_label = [self.labels[i] for i in only_indexes] else: - labels_data = self.data + labels_data: list[dict] = self.data labels_label = self.labels + if not labels_label or not labels_data: + raise ValueError("No label data found in category map data") + # @TODO standardize species search / lookup. # See similar query in ml.models.pipeline.get_or_create_taxon_for_classification() taxa = Taxon.objects.filter( From da6f435320eed3acb075670ccc89d3b8dad68a65 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 16 Sep 2025 22:07:50 -0700 Subject: [PATCH 14/15] Update ami/ml/tests.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ami/ml/tests.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index d91fdf4c2..d14c04dfb 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -553,8 +553,6 @@ def test_unknown_algorithm_returned_by_processing_service(self): self.assertEqual(new_algorithm_count, current_total_algorithm_count) # Ensure new algorithms were also added to the pipeline - # self.assertTrue(self.pipeline.algorithms.filter(name=new_detector.name, key=new_detector.key).exists()) - # self.assertTrue(self.pipeline.algorithms.filter(name=new_classifier.name, key=new_classifier.key).exists()) def test_yes_reprocess_if_new_terminal_algorithm_same_intermediate(self): """ From 357244a66147c65a4dea6f322c4d25f398f28195 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 16 Sep 2025 22:08:02 -0700 Subject: [PATCH 15/15] Update ami/ml/models/pipeline.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ami/ml/models/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index dc849ff0f..cb51af31a 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -852,7 +852,7 @@ def save_results( ) algorithms_known: dict[str, Algorithm] = {algo.key: algo for algo in pipeline.algorithms.all()} - job_logger.info(f"Algorithms registered for pipeline: \n{''.join(algorithms_known.keys())}") + job_logger.info(f"Algorithms registered for pipeline: \n{', '.join(algorithms_known.keys())}") if results.algorithms: logger.warning(