From edb4e74f7d661cfe614c15cfc036b6635dde50ca Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 12:35:32 -0700 Subject: [PATCH 01/10] fix: cleanup comments, fix type checking --- ami/ml/models/pipeline.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index b8202d776..f7315db3d 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from ami.ml.models import ProcessingService # , ProjectPipelineConfig + from ami.ml.models import ProcessingService, ProjectPipelineConfig import collections import dataclasses @@ -162,9 +162,6 @@ def process_images( ) -> PipelineResultsResponse: """ Process images using ML pipeline API. - - @TODO find a home for this function. - @TODO break into task chunks. """ job = None task_logger = logger @@ -927,7 +924,7 @@ class Pipeline(BaseModel): "main.Project", related_name="pipelines", blank=True, through="ml.ProjectPipelineConfig" ) processing_services: models.QuerySet[ProcessingService] - # project_pipeline_configs: models.QuerySet[ProjectPipelineConfig] + project_pipeline_configs: models.QuerySet[ProjectPipelineConfig] class Meta: ordering = ["name", "version"] From e576a9f21a4a4a6ead53e500edffaefc3aff7da0 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 12:39:40 -0700 Subject: [PATCH 02/10] fix: clean up logging & types when sending project pipeline config --- ami/ml/models/pipeline.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index f7315db3d..952007360 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -198,29 +198,21 @@ def process_images( if url ] + config = {} if project_id: try: - config = pipeline.project_pipeline_configs.get(project_id=project_id).config + project_pipeline_config = pipeline.project_pipeline_configs.get(project_id=project_id) + config = project_pipeline_config.config or {} task_logger.info( f"Sending pipeline request using {config} from the project-pipeline config " f"for Pipeline {pipeline} and Project id {project_id}." ) except pipeline.project_pipeline_configs.model.DoesNotExist as e: - task_logger.error( - f"Error getting the project-pipeline config for Pipeline {pipeline} " - f"and Project id {project_id}: {e}" - ) - config = {} - task_logger.info( - "Using empty config when sending pipeline request since no project-pipeline config " - f"was found for Pipeline {pipeline} and Project id {project_id}" + task_logger.warning( + f"No project-pipeline config for Pipeline {pipeline} " f"and Project id {project_id}: {e}" ) else: - config = {} - task_logger.info( - "Using empty config when sending pipeline request " - f"since no project id was provided for Pipeline {pipeline}" - ) + task_logger.warning(f"Pipeline {pipeline} is not associated with a project") request_data = PipelineRequest( pipeline=pipeline.slug, From 7cf9b9593ce65526605828911518d8c7cef47c87 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 12:44:46 -0700 Subject: [PATCH 03/10] fix: comments & type checking --- ami/ml/models/pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index 952007360..74cdfff4a 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -4,6 +4,7 @@ if TYPE_CHECKING: from ami.ml.models import ProcessingService, ProjectPipelineConfig + from ami.jobs.models import Job import collections import dataclasses @@ -903,7 +904,7 @@ class Pipeline(BaseModel): description = models.TextField(blank=True) version = models.IntegerField(default=1) version_name = models.CharField(max_length=255, blank=True) - # @TODO the algorithms list be retrieved by querying the pipeline endpoint + # @TODO the algorithms attribute is not currently used. Review for removal. algorithms = models.ManyToManyField("ml.Algorithm", related_name="pipelines") stages: list[PipelineStage] = SchemaField( default=default_stages, @@ -917,6 +918,7 @@ class Pipeline(BaseModel): ) processing_services: models.QuerySet[ProcessingService] project_pipeline_configs: models.QuerySet[ProjectPipelineConfig] + jobs: models.QuerySet[Job] class Meta: ordering = ["name", "version"] From fbe2b9c329478b205217190a0d9891fb519c2eee Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 14:50:37 -0700 Subject: [PATCH 04/10] fix: add timeout for processing service checks --- ami/ml/models/processing_service.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py index 26684652f..d6a1bd0e5 100644 --- a/ami/ml/models/processing_service.py +++ b/ami/ml/models/processing_service.py @@ -101,7 +101,7 @@ def create_pipelines(self): algorithms_created=algorithms_created, ) - def get_status(self): + def get_status(self, timeout=6): """ Check the status of the processing service. This is a simple health check that pings the /readyz endpoint of the service. @@ -116,7 +116,7 @@ def get_status(self): resp = None try: - resp = requests.get(ready_check_url) + resp = requests.get(ready_check_url, timeout=timeout) resp.raise_for_status() self.last_checked_live = True latency = time.time() - start_time @@ -158,13 +158,13 @@ def get_status(self): return response - def get_pipeline_configs(self): + def get_pipeline_configs(self, timeout=6): """ Get the pipeline configurations from the processing service. This can be a long response as it includes the full category map for each algorithm. """ info_url = urljoin(self.endpoint_url, "info") - resp = requests.get(info_url) + resp = requests.get(info_url, timeout=timeout) resp.raise_for_status() info_data = ProcessingServiceInfoResponse.parse_obj(resp.json()) return info_data.pipelines From eda17b9b6e83ad0744245c23491f488e43409a8a Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 15:02:50 -0700 Subject: [PATCH 05/10] fix: names of AppConfig classes --- ami/jobs/apps.py | 2 +- ami/labelstudio/apps.py | 2 +- ami/ml/apps.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ami/jobs/apps.py b/ami/jobs/apps.py index 6c222128f..fc96ad525 100644 --- a/ami/jobs/apps.py +++ b/ami/jobs/apps.py @@ -2,6 +2,6 @@ from django.utils.translation import gettext_lazy as _ -class UsersConfig(AppConfig): +class JobsConfig(AppConfig): name = "ami.jobs" verbose_name = _("Jobs") diff --git a/ami/labelstudio/apps.py b/ami/labelstudio/apps.py index ba5c95752..46e943844 100644 --- a/ami/labelstudio/apps.py +++ b/ami/labelstudio/apps.py @@ -2,6 +2,6 @@ from django.utils.translation import gettext_lazy as _ -class UsersConfig(AppConfig): +class LabelStudioConfig(AppConfig): name = "ami.labelstudio" verbose_name = _("Label Studio Integration") diff --git a/ami/ml/apps.py b/ami/ml/apps.py index fc64a2439..6b6752c1c 100644 --- a/ami/ml/apps.py +++ b/ami/ml/apps.py @@ -2,6 +2,6 @@ from django.utils.translation import gettext_lazy as _ -class UsersConfig(AppConfig): +class MLConfig(AppConfig): name = "ami.ml" verbose_name = _("Machine Learning") From c9510d38ea1e7b6085576a0d6c91578b77c6beb3 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 15:03:42 -0700 Subject: [PATCH 06/10] feat: create a default config object for pipelines --- ami/jobs/models.py | 11 ++--- .../0021_pipeline_default_config.py | 24 ++++++++++ ami/ml/models/pipeline.py | 47 +++++++++++++------ ami/ml/schemas.py | 15 +++++- ami/ml/tests.py | 24 ++++++++++ processing_services/example/api/schemas.py | 16 ++++++- 6 files changed, 114 insertions(+), 23 deletions(-) create mode 100644 ami/ml/migrations/0021_pipeline_default_config.py diff --git a/ami/jobs/models.py b/ami/jobs/models.py index eab3b0b13..c5853689d 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -399,10 +399,9 @@ def run(cls, job: "Job"): total_detections = 0 total_classifications = 0 - # Set to low size because our response JSON just got enormous - # @TODO make this configurable - CHUNK_SIZE = 1 - chunks = [images[i : i + CHUNK_SIZE] for i in range(0, image_count, CHUNK_SIZE)] # noqa + config = job.pipeline.get_config(project_id=job.project.pk) + chunk_size = config.get("request_source_image_batch_size", 1) + chunks = [images[i : i + chunk_size] for i in range(0, image_count, chunk_size)] # noqa request_failed_images = [] for i, chunk in enumerate(chunks): @@ -434,9 +433,9 @@ def run(cls, job: "Job"): "process", status=JobState.STARTED, progress=(i + 1) / len(chunks), - processed=min((i + 1) * CHUNK_SIZE, image_count), + processed=min((i + 1) * chunk_size, image_count), failed=len(request_failed_images), - remaining=max(image_count - ((i + 1) * CHUNK_SIZE), 0), + remaining=max(image_count - ((i + 1) * chunk_size), 0), ) # count the completed, successful, and failed save_tasks: diff --git a/ami/ml/migrations/0021_pipeline_default_config.py b/ami/ml/migrations/0021_pipeline_default_config.py new file mode 100644 index 000000000..5258a4006 --- /dev/null +++ b/ami/ml/migrations/0021_pipeline_default_config.py @@ -0,0 +1,24 @@ +# Generated by Django 4.2.10 on 2025-03-19 16:27 + +import ami.ml.schemas +from django.db import migrations +import django_pydantic_field.fields + + +class Migration(migrations.Migration): + dependencies = [ + ("ml", "0020_projectpipelineconfig_alter_pipeline_projects"), + ] + + operations = [ + migrations.AddField( + model_name="pipeline", + name="default_config", + field=django_pydantic_field.fields.PydanticSchemaField( + config=None, + default=dict, + help_text="The default configuration for the pipeline. Used by both the job sending images to the pipeline and the processing service.", + schema=ami.ml.schemas.PipelineRequestConfigParameters, + ), + ), + ] diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index 74cdfff4a..17f67632c 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -41,6 +41,7 @@ ClassificationResponse, DetectionResponse, PipelineRequest, + PipelineRequestConfigParameters, PipelineResultsResponse, SourceImageRequest, SourceImageResponse, @@ -199,26 +200,13 @@ def process_images( if url ] - config = {} - if project_id: - try: - project_pipeline_config = pipeline.project_pipeline_configs.get(project_id=project_id) - config = project_pipeline_config.config or {} - task_logger.info( - f"Sending pipeline request using {config} from the project-pipeline config " - f"for Pipeline {pipeline} and Project id {project_id}." - ) - except pipeline.project_pipeline_configs.model.DoesNotExist as e: - task_logger.warning( - f"No project-pipeline config for Pipeline {pipeline} " f"and Project id {project_id}: {e}" - ) - else: + if not project_id: task_logger.warning(f"Pipeline {pipeline} is not associated with a project") request_data = PipelineRequest( pipeline=pipeline.slug, source_images=source_images, - config=config, + config=pipeline.get_config(project_id=project_id), ) session = create_session() @@ -916,6 +904,15 @@ class Pipeline(BaseModel): projects = models.ManyToManyField( "main.Project", related_name="pipelines", blank=True, through="ml.ProjectPipelineConfig" ) + default_config: PipelineRequestConfigParameters = SchemaField( + schema=PipelineRequestConfigParameters, + default=dict, + help_text=( + "The default configuration for the pipeline. " + "Used by both the job sending images to the pipeline " + "and the processing service." + ), + ) processing_services: models.QuerySet[ProcessingService] project_pipeline_configs: models.QuerySet[ProjectPipelineConfig] jobs: models.QuerySet[Job] @@ -930,6 +927,26 @@ class Meta: def __str__(self): return f'#{self.pk} "{self.name}" ({self.slug}) v{self.version}' + def get_config(self, project_id: int | None = None) -> PipelineRequestConfigParameters: + """ + Get the configuration for the pipeline request. + + This will be the same as pipeline.default_config, but if a project ID is provided, + the project's pipeline config will be used to override the default config. + """ + config = self.default_config + if project_id: + try: + project_pipeline_config = self.project_pipeline_configs.get(project_id=project_id) + if project_pipeline_config.config: + config.update(project_pipeline_config.config) + logger.debug( + f"Using ProjectPipelineConfig for Pipeline {self} and Project #{project_id}:" f"config: {config}" + ) + except self.project_pipeline_configs.model.DoesNotExist as e: + logger.warning(f"No project-pipeline config for Pipeline {self} " f"and Project #{project_id}: {e}") + return config + def collect_images( self, collection: SourceImageCollection | None = None, diff --git a/ami/ml/schemas.py b/ami/ml/schemas.py index 64172a56f..b248f28eb 100644 --- a/ami/ml/schemas.py +++ b/ami/ml/schemas.py @@ -144,10 +144,23 @@ class Config: ] +class PipelineRequestConfigParameters(dict): + """Parameters used to configure a pipeline request. + + Accepts any serializable key-value pair. + Example: {"force_reprocess": True, "auth_token": "abc123"} + + Supported parameters are defined by the pipeline in the processing service + and should be published in the Pipeline's info response. + """ + + pass + + class PipelineRequest(pydantic.BaseModel): pipeline: str source_images: list[SourceImageRequest] - config: dict + config: PipelineRequestConfigParameters | dict | None = None class PipelineResultsResponse(pydantic.BaseModel): diff --git a/ami/ml/tests.py b/ami/ml/tests.py index d48e0c3ae..566f489ce 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -574,6 +574,30 @@ def test_yes_reprocess_if_new_terminal_algorithm_same_intermediate(self): remaining_images_to_process = len(images_again) self.assertEqual(remaining_images_to_process, len(images), "Images not re-processed with new pipeline") + def test_project_pipeline_config(self): + """ + Test the default_config for a pipeline, as well as the project pipeline config. + Ensure the project pipeline parameters override the pipeline defaults. + """ + from ami.ml.models import ProjectPipelineConfig + from ami.ml.schemas import PipelineRequestConfigParameters + + # Add config to the pipeline & project + self.pipeline.default_config = PipelineRequestConfigParameters({"test_param": "test_value"}) + self.pipeline.save() + self.project_pipeline_config = ProjectPipelineConfig.objects.create( + project=self.project, + pipeline=self.pipeline, + config={"test_param": "project_value"}, + ) + self.project_pipeline_config.save() + + # Check the final config + default_config = self.pipeline.get_config() + self.assertEqual(default_config["test_param"], "test_value") + final_config = self.pipeline.get_config(self.project.pk) + self.assertEqual(final_config["test_param"], "project_value") + class TestAlgorithmCategoryMaps(TestCase): def setUp(self): diff --git a/processing_services/example/api/schemas.py b/processing_services/example/api/schemas.py index def01730a..f06e61441 100644 --- a/processing_services/example/api/schemas.py +++ b/processing_services/example/api/schemas.py @@ -187,10 +187,23 @@ class Config: PipelineChoice = typing.Literal["random", "constant"] +class PipelineRequestConfigParameters(dict): + """Parameters used to configure a pipeline request. + + Accepts any serializable key-value pair. + Example: {"force_reprocess": True, "auth_token": "abc123"} + + Supported parameters are defined by the pipeline in the processing service + and should be published in the Pipeline's info response. + """ + + pass + + class PipelineRequest(pydantic.BaseModel): pipeline: PipelineChoice source_images: list[SourceImageRequest] - config: dict + config: PipelineRequestConfigParameters | dict | None = None # Example for API docs: class Config: @@ -203,6 +216,7 @@ class Config: "url": "https://archive.org/download/mma_various_moths_and_butterflies_54143/54143.jpg", } ], + "config": {"force_reprocess": True, "auth_token": "abc123"}, } } From bca808aebc6abc3b82de57f7ad90638103259eab Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 16:57:58 -0700 Subject: [PATCH 07/10] fix: specify real schema for config params in example API --- processing_services/example/api/schemas.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/processing_services/example/api/schemas.py b/processing_services/example/api/schemas.py index f06e61441..d64690432 100644 --- a/processing_services/example/api/schemas.py +++ b/processing_services/example/api/schemas.py @@ -187,7 +187,7 @@ class Config: PipelineChoice = typing.Literal["random", "constant"] -class PipelineRequestConfigParameters(dict): +class PipelineRequestConfigParameters(pydantic.BaseModel): """Parameters used to configure a pipeline request. Accepts any serializable key-value pair. @@ -197,7 +197,14 @@ class PipelineRequestConfigParameters(dict): and should be published in the Pipeline's info response. """ - pass + force_reprocess: bool = pydantic.Field( + default=False, + description="Force reprocessing of the image, even if it has already been processed.", + ) + auth_token: str | None = pydantic.Field( + default=None, + description="An optional authentication token to use for the pipeline.", + ) class PipelineRequest(pydantic.BaseModel): From f574d6ee04e8111438dd5616b37e69c8042aa6ef Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 19 Mar 2025 16:58:18 -0700 Subject: [PATCH 08/10] chore: log final config being sent to the pipeline --- ami/ml/models/pipeline.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index 17f67632c..ea00bb098 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -203,10 +203,13 @@ def process_images( if not project_id: task_logger.warning(f"Pipeline {pipeline} is not associated with a project") + config = pipeline.get_config(project_id=project_id) + task_logger.info(f"Using pipeline config: {config}") + request_data = PipelineRequest( pipeline=pipeline.slug, source_images=source_images, - config=pipeline.get_config(project_id=project_id), + config=config, ) session = create_session() From 6c48a7990760901c83c0a3ab6ba34ff07ee40d91 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 27 Mar 2025 22:06:46 -0700 Subject: [PATCH 09/10] docs: explain request-side parameters --- ami/ml/schemas.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ami/ml/schemas.py b/ami/ml/schemas.py index b248f28eb..4a653d970 100644 --- a/ami/ml/schemas.py +++ b/ami/ml/schemas.py @@ -152,6 +152,12 @@ class PipelineRequestConfigParameters(dict): Supported parameters are defined by the pipeline in the processing service and should be published in the Pipeline's info response. + + Parameters that are used by Antenna before sending the request to the Processing Service + should be prefixed with "request_". + Example: {"request_source_image_batch_size": 8} + Such parameters need to be ignored by the schema in the Processing Service, or + removed before sending the request to the Processing Service. """ pass From ba7acfdfd1ae4072c1c8e164aa1a0336f47c5218 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 27 Mar 2025 22:07:19 -0700 Subject: [PATCH 10/10] chore: lower log level --- ami/ml/models/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index ea00bb098..7ce09b134 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -100,7 +100,7 @@ def filter_processed_images( ) # log all algorithms that are in the pipeline but not in the detection missing_algos = pipeline_algorithm_ids - detection_algorithm_ids - task_logger.info(f"Image #{image.pk} needs classification by pipeline's algorithms: {missing_algos}") + task_logger.debug(f"Image #{image.pk} needs classification by pipeline's algorithms: {missing_algos}") yield image else: # If all detections have been classified by the pipeline, skip the image