From 0f13b65eaf2ffc907090b01b30a8f9a479a40e27 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Sun, 4 Aug 2024 17:40:52 +0200 Subject: [PATCH 01/28] clean task queue --- controller/task_queue/handler/__init__.py | 0 .../handler/attribute_calculation.py | 58 ----- controller/task_queue/handler/embedding.py | 95 ------- .../task_queue/handler/information_source.py | 62 ----- controller/task_queue/handler/macro.py | 44 ---- .../task_queue/handler/markdown_file.py | 53 ---- .../handler/parse_cognition_tmp_file.py | 44 ---- controller/task_queue/handler/task_queue.py | 106 -------- controller/task_queue/handler/tokenization.py | 56 ---- controller/task_queue/manager.py | 132 +--------- controller/task_queue/task_queue.py | 242 ------------------ controller/task_queue/util.py | 16 -- submodules/model | 2 +- 13 files changed, 2 insertions(+), 908 deletions(-) delete mode 100644 controller/task_queue/handler/__init__.py delete mode 100644 controller/task_queue/handler/attribute_calculation.py delete mode 100644 controller/task_queue/handler/embedding.py delete mode 100644 controller/task_queue/handler/information_source.py delete mode 100644 controller/task_queue/handler/macro.py delete mode 100644 controller/task_queue/handler/markdown_file.py delete mode 100644 controller/task_queue/handler/parse_cognition_tmp_file.py delete mode 100644 controller/task_queue/handler/task_queue.py delete mode 100644 controller/task_queue/handler/tokenization.py delete mode 100644 controller/task_queue/task_queue.py delete mode 100644 controller/task_queue/util.py diff --git a/controller/task_queue/handler/__init__.py b/controller/task_queue/handler/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/controller/task_queue/handler/attribute_calculation.py b/controller/task_queue/handler/attribute_calculation.py deleted file mode 100644 index f7b6d713..00000000 --- a/controller/task_queue/handler/attribute_calculation.py +++ /dev/null @@ -1,58 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from controller.attribute import manager as attribute_manager -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, - attribute as attribute_db_bo, -) -from submodules.model.enums import AttributeState, DataTypes -from ..util import if_task_queue_send_websocket -from controller.tokenization.tokenization_service import ( - request_reupload_docbins, -) - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - attribute_id = task["task_info"]["attribute_id"] - # check attribute still exists - - attribute_item = attribute_db_bo.get(project_id, attribute_id) - if attribute_item is None: - return False - task_db_obj.is_active = True - general.commit() - if_task_queue_send_websocket( - task["task_info"], f"ATTRIBUTE:{attribute_id}:{attribute_item.name}" - ) - - attribute_manager.calculate_user_attribute_all_records( - project_id, task["created_by"], attribute_id - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - project_id = task["project_id"] - attribute_id = task["task_info"]["attribute_id"] - attribute_item = attribute_db_bo.get(project_id, attribute_id) - if attribute_item is None: - return True - if attribute_item.state == AttributeState.FAILED.value: - return True - if attribute_item.state == AttributeState.USABLE.value: - if attribute_item.data_type == DataTypes.TEXT.value: - return attribute_db_bo.is_attribute_tokenization_finished( - project_id, attribute_id - ) - else: - request_reupload_docbins(project_id) - return True diff --git a/controller/task_queue/handler/embedding.py b/controller/task_queue/handler/embedding.py deleted file mode 100644 index f353f0ce..00000000 --- a/controller/task_queue/handler/embedding.py +++ /dev/null @@ -1,95 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from controller.embedding import manager as embedding_manager -from submodules.model import enums -from submodules.model.business_objects import ( - agreement as agreement_db_bo, - task_queue as task_queue_db_bo, - embedding as embedding_db_bo, - general, -) -from submodules.model.enums import EmbeddingState -from ..util import if_task_queue_send_websocket - -TASK_DONE_STATES = [EmbeddingState.FINISHED.value, EmbeddingState.FAILED.value] - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 5 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - - # check embedding already exists - embedding_item = embedding_db_bo.get_embedding_id_and_type( - project_id, task["task_info"]["embedding_name"] - ) - if embedding_item is not None: - task_queue_db_bo.remove_task_from_queue(project_id, task["id"], True) - return False - task_db_obj.is_active = True - general.commit() - - user_id = task["created_by"] - attribute_id = task["task_info"]["attribute_id"] - embedding_type = task["task_info"]["embedding_type"] - embedding_name = task["task_info"]["embedding_name"] - platform = task["task_info"]["platform"] - model = task["task_info"]["model"] - api_token = task["task_info"]["api_token"] - - terms_text = task["task_info"]["terms_text"] - terms_accepted = task["task_info"]["terms_accepted"] - - filter_attributes = task["task_info"]["filter_attributes"] - additional_data = task["task_info"]["additional_data"] - embedding_item = embedding_db_bo.create( - project_id, - attribute_id, - embedding_name, - user_id, - enums.EmbeddingState.INITIALIZING.value, - type=embedding_type, - model=model, - platform=platform, - api_token=api_token, - filter_attributes=filter_attributes, - additional_data=additional_data, - ) - if ( - platform == enums.EmbeddingPlatform.OPENAI.value - or platform == enums.EmbeddingPlatform.COHERE.value - or platform == enums.EmbeddingPlatform.AZURE.value - ): - agreement_db_bo.create( - project_id, - user_id, - terms_text, - terms_accepted, - xfkey=embedding_item.id, - xftype=enums.AgreementType.EMBEDDING.value, - ) - - general.commit() - embedding_id = str(embedding_item.id) - if_task_queue_send_websocket( - task["task_info"], f"EMBEDDING:{embedding_id}:{embedding_name}" - ) - - embedding_manager.create_embedding(project_id, embedding_id) - - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - embedding_item = embedding_db_bo.get_embedding_by_name( - task["project_id"], task["task_info"]["embedding_name"] - ) - # if it doesn't exists anymore, it means it was deleted -> we are done with the task - if embedding_item is None: - return True - return embedding_item.state in TASK_DONE_STATES diff --git a/controller/task_queue/handler/information_source.py b/controller/task_queue/handler/information_source.py deleted file mode 100644 index 826f2c7d..00000000 --- a/controller/task_queue/handler/information_source.py +++ /dev/null @@ -1,62 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from controller.payload import manager as payload_manager -from controller.zero_shot import manager as zero_shot_manager -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, - information_source as information_source_db_bo, -) -from submodules.model.enums import PayloadState, InformationSourceType -from ..util import if_task_queue_send_websocket - -TASK_DONE_STATES = [PayloadState.FINISHED.value, PayloadState.FAILED.value] - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - information_source_id = task["task_info"]["information_source_id"] - # check information source still exists - - is_item = information_source_db_bo.get(project_id, information_source_id) - if is_item is None: - return False - task_db_obj.is_active = True - general.commit() - user_id = task["created_by"] - payload_id = None - if is_item.type == InformationSourceType.ZERO_SHOT.value: - payload_id = zero_shot_manager.start_zero_shot_for_project_thread( - project_id, information_source_id, user_id - ) - else: - payload = payload_manager.create_payload( - project_id, information_source_id, user_id - ) - payload_id = str(payload.id) - task["task_info"]["payload_id"] = payload_id - if_task_queue_send_websocket( - task["task_info"], - f"INFORMATION_SOURCE:{information_source_id}:{payload_id}:{is_item.name}", - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - project_id = task["project_id"] - information_source_id = task["task_info"]["information_source_id"] - is_item = information_source_db_bo.get(project_id, information_source_id) - if is_item is None: - return True - payload_id = task["task_info"]["payload_id"] - payload_item = information_source_db_bo.get_payload(project_id, payload_id) - if payload_item is None: - return True - return payload_item.state in TASK_DONE_STATES diff --git a/controller/task_queue/handler/macro.py b/controller/task_queue/handler/macro.py deleted file mode 100644 index be094432..00000000 --- a/controller/task_queue/handler/macro.py +++ /dev/null @@ -1,44 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -import os - -import requests -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, -) - -from submodules.model.cognition_objects import macro as macro_db_bo - -BASE_URI = os.getenv("COGNITION_GATEWAY") - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - - task_db_obj.is_active = True - general.commit() - - action = task["task_info"] - - macro_id = action["macro_id"] - execution_id = action["execution_id"] - group_id = action.get("execution_group_id") - requests.put( - f"{BASE_URI}/api/v1/converters/internal/macros/{macro_id}/execution/{execution_id}/start?group_execution_id={group_id}" - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - - action = task["task_info"] - return macro_db_bo.macro_execution_finished( - action["macro_id"], action["execution_id"], action["execution_group_id"] - ) diff --git a/controller/task_queue/handler/markdown_file.py b/controller/task_queue/handler/markdown_file.py deleted file mode 100644 index bfc43780..00000000 --- a/controller/task_queue/handler/markdown_file.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -import os - -import requests -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, -) -from submodules.model.cognition_objects import ( - markdown_file as markdown_file_db_bo, -) -from submodules.model.enums import CognitionMarkdownFileState - -BASE_URI = os.getenv("COGNITION_GATEWAY") - -TASK_DONE_STATES = [ - CognitionMarkdownFileState.FINISHED.value, - CognitionMarkdownFileState.FAILED.value, -] - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - - action = task["task_info"] - org_id = action["org_id"] - dataset_id = action["dataset_id"] - file_id = action["file_id"] - - task_db_obj.is_active = True - general.commit() - requests.post( - f"{BASE_URI}/api/v1/converters/internal/datasets/{dataset_id}/files/{file_id}/parse", - json={"orgId": org_id}, - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - action = task["task_info"] - org_id = action["org_id"] - file_id = action["file_id"] - markdown_file_entity = markdown_file_db_bo.get(org_id=org_id, md_file_id=file_id) - if markdown_file_entity is None: - return True - return markdown_file_entity.state in TASK_DONE_STATES diff --git a/controller/task_queue/handler/parse_cognition_tmp_file.py b/controller/task_queue/handler/parse_cognition_tmp_file.py deleted file mode 100644 index ca9329bb..00000000 --- a/controller/task_queue/handler/parse_cognition_tmp_file.py +++ /dev/null @@ -1,44 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -import os -import submodules.s3.controller as s3 - -import requests -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, -) - -BASE_URI = os.getenv("COGNITION_GATEWAY") - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - - task_db_obj.is_active = True - general.commit() - - action = task["task_info"] - conversation_id = action["conversation_id"] - cognition_project_id = action["cognition_project_id"] - requests.post( - f"{BASE_URI}/api/v1/converters/internal/projects/{cognition_project_id}/conversation/{conversation_id}/parse-tmp-file", - json={ - "minio_path": action["minio_path"], - "bucket": action["bucket"], - }, - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - - action = task["task_info"] - - return not s3.object_exists(action["bucket"], action["minio_path"]) diff --git a/controller/task_queue/handler/task_queue.py b/controller/task_queue/handler/task_queue.py deleted file mode 100644 index 7fa54986..00000000 --- a/controller/task_queue/handler/task_queue.py +++ /dev/null @@ -1,106 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - project as project_db_bo, -) - -from util import notification - -from submodules.model.enums import TaskType, InformationSourceType - -from .. import manager - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -# task queues work a bit different from others since they are just wrapper around tasks in order -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - if len(task["task_info"]) == 0: - return False - if not isinstance(task["task_info"], list): - raise ValueError("Something is wrong") - task["initial_count"] = len(task["task_info"]) - task["done_count"] = 0 - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - project_id = task["project_id"] - if not project_db_bo.get(project_id): - # parent project was deleted so the queue item is done - return True - sub_task_id = task.get("sub_task_id") - if not sub_task_id: - # no sub task yet - __start_sub_task(task) - return False - if sub_task_id != TaskType.TASK_QUEUE_ACTION.value: - # special "id" for task queue actions since they don't have an actual id/db entry - sub_task_item = task_queue_db_bo.get(sub_task_id) - if sub_task_item: - # current sub task still running - return False - - if len(task["task_info"]) > 0: - # still further sub task items - task["done_count"] += 1 - __send_websocket_progress(task) - __start_sub_task(task) - return False - - __send_websocket_progress(task) - return True - - -def __start_sub_task(task: Dict[str, Any]) -> str: - user_id = task["created_by"] - - next_entry = task["task_info"].pop(0) - task_type = next_entry.get("task_type") - # if a specific project id is given for the item we use that otherwise the one from the task - # can e.g. happen with cognition init tasks - project_id = next_entry.get("project_id", task["project_id"]) - del next_entry["task_type"] - - try: - task_type_parsed = TaskType[task_type.upper()] - except KeyError: - raise ValueError(f"Invalid Task Type: {task_type}") - - priority = False - if task_type_parsed == TaskType.ATTRIBUTE_CALCULATION: - priority = True - elif task_type_parsed == TaskType.INFORMATION_SOURCE: - priority = ( - next_entry.get("source_type") != InformationSourceType.ZERO_SHOT.value - ) - elif task_type_parsed == TaskType.TASK_QUEUE_ACTION: - next_entry = next_entry.get("action") - next_entry["parent_task_queue_id"] = task["id"] - task_id, _ = manager.add_task( - project_id, task_type_parsed, user_id, next_entry, priority - ) - - task["sub_task_id"] = task_id - task_queue_db_bo.update_task_info(task["id"], task["task_info"], True) - - -def __send_websocket_progress(task: Dict[str, Any]) -> None: - project_id = task["project_id"] - task_id = task["id"] - if len(task["task_info"]) == 0: - notification.send_organization_update( - project_id, f"task_queue:{task_id}:state:DONE" - ) - else: - progress = round(task["done_count"] / task["initial_count"], 4) - - notification.send_organization_update( - project_id, f"task_queue:{task_id}:progress:{progress}" - ) diff --git a/controller/task_queue/handler/tokenization.py b/controller/task_queue/handler/tokenization.py deleted file mode 100644 index 7b6c8c7a..00000000 --- a/controller/task_queue/handler/tokenization.py +++ /dev/null @@ -1,56 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, - attribute as attribute_db_bo, -) -from controller.tokenization import tokenization_service -from submodules.model.business_objects.tokenization import ( - is_doc_bin_creation_running_or_queued, -) -from submodules.model.enums import RecordTokenizationScope -from ..util import if_task_queue_send_websocket - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 2 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - - if task["task_info"]["scope"] == RecordTokenizationScope.ATTRIBUTE.value: - attribute_item = attribute_db_bo.get( - project_id, task["task_info"]["attribute_id"] - ) - if attribute_item is None: - task_queue_db_bo.remove_task_from_queue(project_id, task["id"], True) - return False - task_db_obj.is_active = True - general.commit() - if_task_queue_send_websocket(task["task_info"], f"TOKENIZATION") - - if task["task_info"]["scope"] == RecordTokenizationScope.PROJECT.value: - tokenization_service.request_tokenize_project( - project_id, - task["created_by"], - task["task_info"]["include_rats"], - task["task_info"]["only_uploaded_attributes"], - ) - else: - tokenization_service.request_tokenize_calculated_attribute( - project_id, - task["created_by"], - task["task_info"]["attribute_id"], - task["task_info"]["include_rats"], - ) - - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - return not is_doc_bin_creation_running_or_queued(task["project_id"], True) diff --git a/controller/task_queue/manager.py b/controller/task_queue/manager.py index 1e66987e..2611e749 100644 --- a/controller/task_queue/manager.py +++ b/controller/task_queue/manager.py @@ -1,60 +1,10 @@ -from typing import Any, List, Dict, Tuple, Callable, Union +from typing import Any, List, Dict import copy - from submodules.model import enums - - from submodules.model.business_objects import ( task_queue as task_queue_db_bo, - embedding as embedding_db_bo, - information_source as information_source_db_bo, ) from submodules.model.models import TaskQueue as TaskQueueDBObj -from .handler import ( - embedding as embedding_handler, - information_source as information_source_handler, - tokenization as tokenization_handler, - attribute_calculation as attribute_calculation_handler, - task_queue as task_queue_handler, - markdown_file as markdown_file_handler, - parse_cognition_tmp_file as cognition_tmp_file_handler, - macro as macro_handler, -) -from .util import if_task_queue_send_websocket - -from controller.task_queue import task_queue -from controller.data_slice import manager as data_slice_manager -from controller.gates import manager as gates_manager -from controller.weak_supervision import manager as weak_supervision_manager -from controller.transfer.cognition.import_wizard import finish_cognition_setup -from util import notification, daemon - - -def add_task( - project_id: str, - task_type: enums.TaskType, - user_id: str, - task_info: Union[Dict[str, str], List[Dict[str, str]]], - priority: bool = False, -) -> Tuple[str, int]: - if task_type == enums.TaskType.TASK_QUEUE and not isinstance(task_info, list): - raise ValueError("Task queues only work with list of singular task items") - elif task_type == enums.TaskType.TASK_QUEUE and not len(task_info): - raise ValueError("Task queues need at least one item") - elif task_type != enums.TaskType.TASK_QUEUE and not isinstance(task_info, dict): - raise ValueError("Queue entries only accept dicts") - - if task_type == enums.TaskType.TASK_QUEUE_ACTION: - # just execute the action - __execute_action(project_id, user_id, task_info) - return enums.TaskType.TASK_QUEUE_ACTION.value, 0 - - task_item = task_queue_db_bo.add( - project_id, task_type, user_id, task_info, priority, with_commit=True - ) - task_id = str(task_item.id) - queue_position = add_task_to_task_queue(task_item) - return task_id, queue_position def get_all_waiting_by_type(project_id: str, task_type: str) -> List[TaskQueueDBObj]: @@ -79,86 +29,6 @@ def parse_task_to_dict(task: TaskQueueDBObj) -> Dict[str, Any]: } -def get_task_function_by_type(task_type: str) -> Tuple[Callable, Callable, int]: - if task_type == enums.TaskType.EMBEDDING.value: - return embedding_handler.get_task_functions() - if task_type == enums.TaskType.INFORMATION_SOURCE.value: - return information_source_handler.get_task_functions() - if task_type == enums.TaskType.TOKENIZATION.value: - return tokenization_handler.get_task_functions() - if task_type == enums.TaskType.ATTRIBUTE_CALCULATION.value: - return attribute_calculation_handler.get_task_functions() - if task_type == enums.TaskType.TASK_QUEUE.value: - return task_queue_handler.get_task_functions() - if task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value: - return markdown_file_handler.get_task_functions() - if task_type == enums.TaskType.PARSE_COGNITION_TMP_FILE.value: - return cognition_tmp_file_handler.get_task_functions() - if task_type == enums.TaskType.RUN_COGNITION_MACRO.value: - return macro_handler.get_task_functions() - raise ValueError(f"Task type {task_type} not supported yet") - - -def add_task_to_task_queue(task: TaskQueueDBObj) -> int: - start_func, check_func, check_every = get_task_function_by_type(task.task_type) - queue = None - if ( - task.task_type == enums.TaskType.TASK_QUEUE.value - or task.task_type == enums.TaskType.RUN_COGNITION_MACRO.value - ): - # macros have tasks (e.g. etl parsing) inside so the execution shouldn't block own queue items - queue = task_queue.get_task_queue_queue() - else: - queue = task_queue.get_task_queue() - return queue.add_task(task, start_func, check_func, check_every) - - def remove_task_from_queue(project_id: str, task_id: str) -> None: task_queue_db_bo.remove_task_from_queue(project_id, task_id, with_commit=True) # no need to dequeue from tasks since the initial check should handle it - - -def __execute_action(project_id: str, user_id: str, action: Dict[str, Any]) -> None: - action_type = action.get("action_type") - if action_type == enums.TaskQueueAction.CREATE_OUTLIER_SLICE.value: - embedding_name = action.get("embedding_name") - embedding_item = embedding_db_bo.get_embedding_by_name( - project_id, embedding_name - ) - if not embedding_item: - raise ValueError(f"Unknown embedding {embedding_name}") - - data_slice_manager.create_outlier_slice( - project_id, user_id, str(embedding_item.id) - ) - elif action_type == enums.TaskQueueAction.START_GATES.value: - # id overwrite since a queue can hold different project ids (e.g. wizard) - project_id = action.get("project_id") - if not project_id: - raise ValueError("Missing project id") - if_task_queue_send_websocket( - action, f"{enums.TaskQueueAction.START_GATES.value}:{project_id}" - ) - state = enums.PayloadState.FINISHED.value - if not gates_manager.start_gates_container(project_id): - state = enums.PayloadState.FAILED.value - notification.send_organization_update(project_id, f"gates:startup:{state}") - - elif action_type == enums.TaskQueueAction.SEND_WEBSOCKET.value: - org_id = action.get("organization_id") - notification.send_organization_update( - project_id, action.get("message", ""), organization_id=org_id - ) - elif action_type == enums.TaskQueueAction.FINISH_COGNITION_SETUP.value: - daemon.run(finish_cognition_setup, action.get("cognition_project_id")) - elif action_type == enums.TaskQueueAction.RUN_WEAK_SUPERVISION.value: - information_source_db_bo.update_is_selected_for_project(project_id, False) - information_source_db_bo.update_is_selected_for_project( - project_id, - True, - with_commit=True, - only_with_state=enums.PayloadState.FINISHED, - ) - weak_supervision_manager.run_weak_supervision(project_id, user_id) - else: - raise ValueError(f"Invalid action type: {action_type}") diff --git a/controller/task_queue/task_queue.py b/controller/task_queue/task_queue.py deleted file mode 100644 index 81d6e5b9..00000000 --- a/controller/task_queue/task_queue.py +++ /dev/null @@ -1,242 +0,0 @@ -from threading import Lock, Thread -from typing import Tuple, Dict, Any, Callable -import os -import time -from submodules.model.models import TaskQueue as TaskQueueDBObj -from . import manager -from submodules.model.business_objects import general, task_queue as task_queue_db_bo -from submodules.model.cognition_objects import macro as macro_db_bo -import traceback -from submodules.model.enums import TaskType, MacroExecutionState - - -# custom class wrapping a list in order to make it thread safe -class ThreadSafeList: - def __init__(self): - self._list = list() - self._lock = Lock() - - def append(self, value): - with self._lock: - self._list.append(value) - - def extend(self, value): - with self._lock: - self._list.extend(value) - - def pop(self, index: int = -1): - with self._lock: - return self._list.pop(index) - - def get(self, index: int): - with self._lock: - return self._list[index] - - def length(self): - with self._lock: - return len(self._list) - - def print(self): - with self._lock: - return print(self._list, flush=True) - - -class CustomTaskQueue: - class TaskInfo: - def __init__( - self, - task: TaskQueueDBObj, - start_function: Callable[[Dict[str, Any]], bool], - check_finished_function: Callable[[Dict[str, Any]], bool], - check_every: int, - ) -> None: - self.task_dict = manager.parse_task_to_dict(task) - self.start_function = start_function - self.check_finished_function = check_finished_function - self.check_every = check_every - - def __init__(self, max_normal: int, max_priority: int) -> None: - self._lock = Lock() - self._max_normal = max_normal - self._fifo_queue_normal = ThreadSafeList() - self._active_normal = ThreadSafeList() - self._max_priority = max_priority - self._fifo_queue_priority = ThreadSafeList() - self._active_priority = ThreadSafeList() - self._checker_thread = Thread( - target=self.__thread_checker, - daemon=True, - ) - self._checker_thread.start() - - def add_task( - self, - task: TaskQueueDBObj, - start_function: Callable[[Dict[str, Any]], bool], - check_finished_function: Callable[[Dict[str, Any]], bool], - check_every: int = 1, - ) -> int: - # return queue position or 0 if directly started - with self._lock: - active = self._active_priority if task.priority else self._active_normal - max = self._max_priority if task.priority else self._max_normal - - task_info = self.TaskInfo( - task, start_function, check_finished_function, check_every - ) - add_to_prio = task.priority - # ensure prio tasks can be added to none prio queue if fits - if active.length() >= max: - if task.priority and self._active_normal.length() < self._max_normal: - add_to_prio = False - else: - append_to = ( - self._fifo_queue_priority - if task.priority - else self._fifo_queue_normal - ) - append_to.append(task_info) - return append_to.length() - self.__start_task(task_info, add_to_prio) - return 0 - - def __start_task(self, task_info: TaskInfo, to_prio: bool) -> bool: - if not task_info: - return False - - active = self._active_priority if to_prio else self._active_normal - # since multiple gateway container can exist the start can "fail" if the task is already active - try: - if not task_info.start_function(task_info.task_dict): - return False - active.append(task_info) - task_info.task_dict["is_active"] = True - except Exception: - print( - f"Task start of task failed (task info->{task_info.task_dict}) -> deleting from db...", - flush=True, - ) - task_queue_db_bo.remove_task_from_queue( - task_info.task_dict["project_id"], - task_info.task_dict["id"], - with_commit=True, - ) - - print("done...\nError:", flush=True) - print(traceback.format_exc(), flush=True) - return False - return True - - def __thread_checker(self) -> None: - ctx_token = general.get_ctx_token() - - seconds = 0 - while True: - seconds += 1 - if seconds >= 120: - seconds = 0 - ctx_token = general.remove_and_refresh_session(ctx_token, True) - try: - self.__check_task_queue(True, seconds) - self.__check_task_queue(False, seconds) - except Exception: - print(traceback.format_exc(), flush=True) - time.sleep(1) - - def __check_task_queue(self, priority: bool, seconds: int) -> None: - to_check = self._active_priority if priority else self._active_normal - - to_remove = [] - for idx in range(to_check.length()): - task = to_check.get(idx) - if seconds % task.check_every == 0: - if task.check_finished_function(task.task_dict): - task.task_dict["is_active"] = False - to_remove.append(idx) - - # cleanup & requeue existing tasks - for idx in reversed(to_remove): - finished = to_check.pop(idx) - task_queue_db_bo.remove_task_from_queue( - finished.task_dict["project_id"], finished.task_dict["id"] - ) - next_task, p = None, False - while not self.__start_task(next_task, p): - next_task, p = self.__get_next_item(priority) - if not next_task: - break - if len(to_remove) > 0: - general.commit() - - def __get_next_item(self, priority: bool) -> Tuple[TaskInfo, bool]: - # normal queue can solve priority tasks but not the other way around - if priority: - if self._fifo_queue_priority.length() > 0: - return self._fifo_queue_priority.pop(0), True - else: - if self._fifo_queue_normal.length() > 0: - return self._fifo_queue_normal.pop(0), False - if self._fifo_queue_priority.length() > 0: - return self._fifo_queue_priority.pop(0), False - return None, False - - -# global task queue -task_queue = None - -# thread wrapper for tasks item of type TASK_QUEUE so the queue doesn't block an actual calculation slot -task_queue_queue = None - - -def init_task_queues() -> CustomTaskQueue: - global task_queue, task_queue_queue - # init task queue class - max_normal = int(os.getenv("TASK_QUEUE_SLOTS", "2")) - max_priority = int(os.getenv("PRIORITY_TASK_QUEUE_SLOTS", "1")) - task_queue = CustomTaskQueue(max_normal, max_priority) - # double amount as normal so multiple queues can be handled simultaneously if the server can handle it - # tested locally with two wizards which resulted in reasonable performance (ping pong between the queues) - task_queue_queue = CustomTaskQueue(max_normal * 2, 0) - # reset old tasks that weren't finished properly - task_queue_db_bo.set_all_tasks_inactive(True) - - # queue tasks - existing_tasks = task_queue_db_bo.get_all_tasks() - for task in existing_tasks: - start_func, check_func, check_every = manager.get_task_function_by_type( - task.task_type - ) - if task.task_type == TaskType.TASK_QUEUE.value: - task_queue_queue.add_task(task, start_func, check_func, check_every) - elif task.task_type == TaskType.RUN_COGNITION_MACRO.value: - # macros that were running when the server was restarted are set to failed since we dont have pointers to the running process - item = macro_db_bo.get_macro_execution( - task.task_info["execution_id"], - task.task_info["execution_group_id"], - MacroExecutionState.RUNNING, - ) - if item is not None: - item.state = MacroExecutionState.FAILED.value - task_queue_db_bo.remove_task_from_queue(task.project_id, task.id) - continue - task_queue_queue.add_task(task, start_func, check_func, check_every) - else: - task_queue.add_task(task, start_func, check_func, check_every) - general.commit() - return task_queue - - -def get_task_queue() -> CustomTaskQueue: - global task_queue - if task_queue is None: - raise Exception("Task queue not initialized") - - return task_queue - - -def get_task_queue_queue() -> CustomTaskQueue: - global task_queue_queue - if task_queue_queue is None: - raise Exception("Task queue not initialized") - - return task_queue_queue diff --git a/controller/task_queue/util.py b/controller/task_queue/util.py deleted file mode 100644 index 8da74ec6..00000000 --- a/controller/task_queue/util.py +++ /dev/null @@ -1,16 +0,0 @@ -from util import notification - -from typing import Any, Dict - - -def if_task_queue_send_websocket( - task: Dict[str, Any], msg: str, info_type: str = "START" -) -> None: - task_queue_id = task.get("parent_task_queue_id") - if not task_queue_id: - return - - project_id = task["project_id"] - notification.send_organization_update( - project_id, f"task_queue:{task_queue_id}:{info_type}:{msg}" - ) diff --git a/submodules/model b/submodules/model index dfb42793..2f8df4be 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit dfb42793e0a5178c9a793be69b08f696c46fd27d +Subproject commit 2f8df4be74ba6b8142a6ae41737068b5d8dd9d7c From 711af526a4ec8cf0a58a2cf809e6f1e97466f993 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 5 Aug 2024 09:46:25 +0200 Subject: [PATCH 02/28] remove task endpoints --- api/transfer.py | 82 ------------------------------------------------ app.py | 12 ------- submodules/model | 2 +- 3 files changed, 1 insertion(+), 95 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 0b3bc9f0..ef17c047 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -18,12 +18,10 @@ general, organization, tokenization, - project as refinery_project, ) from submodules.model.cognition_objects import ( project as cognition_project, - macro as macro_db_bo, ) from controller.transfer import manager as transfer_manager @@ -243,86 +241,6 @@ def put(self, request) -> PlainTextResponse: return PlainTextResponse("OK") -class CognitionParseMarkdownFile(HTTPEndpoint): - def post(self, request) -> PlainTextResponse: - refinery_project_id = request.path_params["project_id"] - refinery_project_item = refinery_project.get(refinery_project_id) - if not refinery_project_item: - return PlainTextResponse("Bad project id", status_code=400) - - dataset_id = request.path_params["dataset_id"] - file_id = request.path_params["file_id"] - - # via thread to ensure the endpoint returns immediately - - daemon.run( - CognitionParseMarkdownFile.__add_parse_markdown_file_thread, - refinery_project_id, - str(refinery_project_item.created_by), - { - "org_id": str(refinery_project_item.organization_id), - "dataset_id": dataset_id, - "file_id": file_id, - }, - ) - - return PlainTextResponse("OK") - - def __add_parse_markdown_file_thread( - project_id: str, user_id: str, task_info: Dict[str, str] - ): - - ctx_token = general.get_ctx_token() - try: - task_queue_manager.add_task( - project_id, TaskType.PARSE_MARKDOWN_FILE, user_id, task_info - ) - finally: - general.remove_and_refresh_session(ctx_token, False) - - -class CognitionStartMacroExecutionGroup(HTTPEndpoint): - def put(self, request) -> PlainTextResponse: - macro_id = request.path_params["macro_id"] - group_id = request.path_params["group_id"] - - execution_entries = macro_db_bo.get_all_macro_executions(macro_id, group_id) - - if len(execution_entries) == 0: - return PlainTextResponse("No executions found", status_code=400) - if not (cognition_prj_id := execution_entries[0].meta_info.get("project_id")): - return PlainTextResponse("No project id found", status_code=400) - cognition_prj = cognition_project.get(cognition_prj_id) - refinery_prj_id = str( - refinery_project.get_or_create_queue_project( - cognition_prj.organization_id, cognition_prj.created_by, True - ).id - ) - cached = {str(e.id): str(e.created_by) for e in execution_entries} - - def queue_tasks(): - token = general.get_ctx_token() - try: - for exec_id in cached: - task_queue_manager.add_task( - refinery_prj_id, - TaskType.RUN_COGNITION_MACRO, - cached[exec_id], - { - "macro_id": macro_id, - "execution_id": exec_id, - "execution_group_id": group_id, - }, - ) - general.commit() - finally: - general.remove_and_refresh_session(token, False) - - daemon.run(queue_tasks) - - return PlainTextResponse("OK") - - class AssociationsImport(HTTPEndpoint): async def post(self, request) -> JSONResponse: project_id = request.path_params["project_id"] diff --git a/app.py b/app.py index 57619b2c..164d0f6b 100644 --- a/app.py +++ b/app.py @@ -15,8 +15,6 @@ UploadTaskInfo, CognitionImport, CognitionPrepareProject, - CognitionParseMarkdownFile, - CognitionStartMacroExecutionGroup, ) from fast_api.routes.organization import router as org_router from fast_api.routes.project import router as project_router @@ -41,7 +39,6 @@ from starlette.applications import Starlette from starlette.routing import Route, Mount -from controller.task_queue.task_queue import init_task_queues from controller.project.manager import check_in_deletion_projects from route_prefix import ( PREFIX_ORGANIZATION, @@ -135,16 +132,8 @@ "/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize", CognitionPrepareProject, ), - Route( - "/project/{project_id:str}/cognition/datasets/{dataset_id:str}/files/{file_id:str}/queue", - CognitionParseMarkdownFile, - ), Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo), Route("/project", ProjectCreationFromWorkflow), - Route( - "/macro/{macro_id:str}/execution-group/{group_id:str}/queue", - CognitionStartMacroExecutionGroup, - ), Route("/is_managed", IsManagedRest), Route("/is_demo", IsDemoRest), Mount("/api", app=fastapi_app, name="REST API"), @@ -156,7 +145,6 @@ middleware = [Middleware(DatabaseSessionHandler)] app = Starlette(routes=routes, middleware=middleware) -init_task_queues() check_in_deletion_projects() security.check_secret_key() clean_up.clean_up_database() diff --git a/submodules/model b/submodules/model index 2f8df4be..9bca0173 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 2f8df4be74ba6b8142a6ae41737068b5d8dd9d7c +Subproject commit 9bca0173235d277c39009020e2ffd05187cb5521 From f7a7ca923c7a62b75e5ef9f8676f019f60ef4584 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 5 Aug 2024 09:46:31 +0200 Subject: [PATCH 03/28] alembic --- ...5c30d91685_remove_project_id_task_queue.py | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 alembic/versions/af5c30d91685_remove_project_id_task_queue.py diff --git a/alembic/versions/af5c30d91685_remove_project_id_task_queue.py b/alembic/versions/af5c30d91685_remove_project_id_task_queue.py new file mode 100644 index 00000000..17169c2e --- /dev/null +++ b/alembic/versions/af5c30d91685_remove_project_id_task_queue.py @@ -0,0 +1,51 @@ +"""remove project id task queue + +Revision ID: af5c30d91685 +Revises: 0d587af700ce +Create Date: 2024-08-04 15:42:26.747671 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'af5c30d91685' +down_revision = '0d587af700ce' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('macro_execution_summary', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('creation_month', sa.Date(), nullable=True), + sa.Column('macro_type', sa.String(), nullable=True), + sa.Column('execution_count', sa.Integer(), nullable=True), + sa.Column('processed_files_count', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('organization_id', 'creation_month', 'macro_type', name='unique_macro_summary'), + schema='cognition' + ) + op.create_index(op.f('ix_cognition_macro_execution_summary_creation_month'), 'macro_execution_summary', ['creation_month'], unique=False, schema='cognition') + op.create_index(op.f('ix_cognition_macro_execution_summary_organization_id'), 'macro_execution_summary', ['organization_id'], unique=False, schema='cognition') + op.add_column('task_queue', sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True)) + op.drop_constraint('task_queue_project_id_fkey', 'task_queue', type_='foreignkey') + op.create_foreign_key(None, 'task_queue', 'organization', ['organization_id'], ['id'], ondelete='CASCADE') + op.drop_column('task_queue', 'project_id') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('task_queue', sa.Column('project_id', postgresql.UUID(), autoincrement=False, nullable=True)) + op.drop_constraint(None, 'task_queue', type_='foreignkey') + op.create_foreign_key('task_queue_project_id_fkey', 'task_queue', 'project', ['project_id'], ['id'], ondelete='CASCADE') + op.drop_column('task_queue', 'organization_id') + op.drop_index(op.f('ix_cognition_macro_execution_summary_organization_id'), table_name='macro_execution_summary', schema='cognition') + op.drop_index(op.f('ix_cognition_macro_execution_summary_creation_month'), table_name='macro_execution_summary', schema='cognition') + op.drop_table('macro_execution_summary', schema='cognition') + # ### end Alembic commands ### From 2ffd90b93a0e8437b667ef718eeece0dcd798430 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 5 Aug 2024 13:48:23 +0200 Subject: [PATCH 04/28] add itnernal endpoint dummy --- app.py | 8 ++++++++ fast_api/models.py | 5 +++++ fast_api/routes/attribute.py | 4 ++++ fast_api/routes/task_execution.py | 25 +++++++++++++++++++++++++ route_prefix.py | 1 + 5 files changed, 43 insertions(+) create mode 100644 fast_api/routes/task_execution.py diff --git a/app.py b/app.py index 164d0f6b..652db346 100644 --- a/app.py +++ b/app.py @@ -59,6 +59,7 @@ PREFIX_RECORD, PREFIX_WEAK_SUPERVISION, PREFIX_LABELING_TASKS, + PREFIX_TASK_EXECUTION, ) from util import security, clean_up from middleware import log_storage @@ -113,6 +114,10 @@ labeling_tasks_router, prefix=PREFIX_LABELING_TASKS, tags=["labeling-tasks"] ) +fastapi_app_internal = FastAPI() +fastapi_app_internal.include_router( + task_execution_router, prefix=PREFIX_TASK_EXECUTION, tags=["task-execution"] +) routes = [ Route("/notify/{path:path}", Notify), Route("/healthcheck", Healthcheck), @@ -137,6 +142,9 @@ Route("/is_managed", IsManagedRest), Route("/is_demo", IsDemoRest), Mount("/api", app=fastapi_app, name="REST API"), + Mount( + "/internal/api", app=fastapi_app_internal, name="INTERNAL REST API" + ), # task master requesting ] diff --git a/fast_api/models.py b/fast_api/models.py index 40b6e904..dc5e0bdc 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -436,3 +436,8 @@ class CancelTaskBody(BaseModel): project_id: StrictStr task_id: StrictStr task_type: StrictStr + + +class AttributeCalculationTaskExecutionBody(BaseModel): + user_id: StrictStr + attribute_id: StrictStr diff --git a/fast_api/routes/attribute.py b/fast_api/routes/attribute.py index 8675e435..e4395ab7 100644 --- a/fast_api/routes/attribute.py +++ b/fast_api/routes/attribute.py @@ -92,3 +92,7 @@ def delete_user_attribute( ): manager.delete_attribute(project_id, body.attribute_id) return pack_json_result({"data": {"deleteUserAttribute": {"ok": True}}}) + +@router.post( + "{project}" +) \ No newline at end of file diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py new file mode 100644 index 00000000..7840119e --- /dev/null +++ b/fast_api/routes/task_execution.py @@ -0,0 +1,25 @@ +from controller.attribute import manager +from controller.auth import manager as auth_manager +from typing import List, Union +from fast_api.models import AttributeCalculationTaskExecutionBody +from fast_api.routes.client_response import pack_json_result +from fastapi import APIRouter, Body, Depends, Query, Request +from submodules.model.enums import NotificationType +from submodules.model.util import sql_alchemy_to_dict +from util.notification import create_notification + +router = APIRouter() + + +@router.post( + "/{project_id}/attribute-calculation", +) +def calculate_attributes( + project_id: str, + attribute_calculation_task_execution: AttributeCalculationTaskExecutionBody, +): + manager.calculate_user_attribute_all_records( + project_id, + attribute_calculation_task_execution.user_id, + attribute_calculation_task_execution.attribute_id, + ) diff --git a/route_prefix.py b/route_prefix.py index 3215274a..873f6e86 100644 --- a/route_prefix.py +++ b/route_prefix.py @@ -17,3 +17,4 @@ PREFIX_RECORD = PREFIX + "/record" PREFIX_WEAK_SUPERVISION = PREFIX + "/weak-supervision" PREFIX_LABELING_TASKS = PREFIX + "/labeling-tasks" +PREFIX_TASK_EXECUTION = PREFIX + "/task-execution" From bc96da41e7e76203021500fac7c3217a9c8c75ba Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 5 Aug 2024 14:52:30 +0200 Subject: [PATCH 05/28] offer task executions --- api/transfer.py | 12 ++++++--- app.py | 1 + controller/project/manager.py | 9 ++++--- controller/task_master/__init__.py | 0 controller/task_master/manager.py | 20 +++++++++++++++ fast_api/models.py | 4 +++ fast_api/routes/attribute.py | 4 --- fast_api/routes/task_execution.py | 41 ++++++++++++++++++++++-------- start | 1 + 9 files changed, 70 insertions(+), 22 deletions(-) create mode 100644 controller/task_master/__init__.py create mode 100644 controller/task_master/manager.py diff --git a/api/transfer.py b/api/transfer.py index ef17c047..147881cc 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -18,6 +18,7 @@ general, organization, tokenization, + project, ) from submodules.model.cognition_objects import ( @@ -39,7 +40,7 @@ from util import daemon, notification from controller.transfer.cognition.minio_upload import handle_cognition_file_upload -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope @@ -322,11 +323,14 @@ def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) ) if task.file_type != "knowledge_base": only_usable_attributes = task.file_type == "records_add" - task_queue_manager.add_task( - project_id, + project_item = project.get(project_id) + org_id = str(project_item.organization_id) + task_master_manager.queue_task( + str(org_id), + str(task.user_id), TaskType.TOKENIZATION, - task.user_id, { + "project_id": str(project_id), "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": only_usable_attributes, diff --git a/app.py b/app.py index 652db346..39e59486 100644 --- a/app.py +++ b/app.py @@ -34,6 +34,7 @@ from fast_api.routes.record import router as record_router from fast_api.routes.weak_supervision import router as weak_supervision_router from fast_api.routes.labeling_tasks import router as labeling_tasks_router +from fast_api.routes.task_execution import router as task_execution_router from middleware.database_session import handle_db_session from middleware.starlette_tmp_middleware import DatabaseSessionHandler from starlette.applications import Starlette diff --git a/controller/project/manager.py b/controller/project/manager.py index d6ade258..0a168ac4 100644 --- a/controller/project/manager.py +++ b/controller/project/manager.py @@ -23,7 +23,7 @@ ) from fast_api.types import HuddleData, ProjectSize, GatesIntegrationData from util import daemon, notification -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope from submodules.model.business_objects import util as db_util from submodules.s3 import controller as s3 @@ -177,11 +177,12 @@ def import_sample_project( project_item = handler.import_sample_project( user_id, organization_id, name, project_type ) - task_queue_manager.add_task( - str(project_item.id), + task_master_manager.queue_task( + str(organization_id), + str(user_id), TaskType.TOKENIZATION, - user_id, { + "project_id": str(project_item.id), "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, diff --git a/controller/task_master/__init__.py b/controller/task_master/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/controller/task_master/manager.py b/controller/task_master/manager.py new file mode 100644 index 00000000..90de2ac1 --- /dev/null +++ b/controller/task_master/manager.py @@ -0,0 +1,20 @@ +import requests +import os +from submodules.model.business_objects import project as refinery_project +from submodules.model import enums +from typing import Any, Dict + +TASK_MASTER_URL = os.getenv("TASK_MASTER") + + +def queue_task( + org_id: str, user_id: str, task_type: enums.TaskType, task_info: Dict[str, Any] +) -> requests.Response: + + task_payload = { + "orgId": org_id, + "userId": user_id, + "taskType": task_type.value, + "taskInfo": task_info, + } + return requests.put(f"{TASK_MASTER_URL}/task/queue", json=task_payload) diff --git a/fast_api/models.py b/fast_api/models.py index dc5e0bdc..d658a625 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -441,3 +441,7 @@ class CancelTaskBody(BaseModel): class AttributeCalculationTaskExecutionBody(BaseModel): user_id: StrictStr attribute_id: StrictStr + + +class TokenizationTaskExecutionBody(BaseModel): + pass diff --git a/fast_api/routes/attribute.py b/fast_api/routes/attribute.py index e4395ab7..8675e435 100644 --- a/fast_api/routes/attribute.py +++ b/fast_api/routes/attribute.py @@ -92,7 +92,3 @@ def delete_user_attribute( ): manager.delete_attribute(project_id, body.attribute_id) return pack_json_result({"data": {"deleteUserAttribute": {"ok": True}}}) - -@router.post( - "{project}" -) \ No newline at end of file diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index 7840119e..f7177ba8 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -1,15 +1,24 @@ -from controller.attribute import manager -from controller.auth import manager as auth_manager -from typing import List, Union -from fast_api.models import AttributeCalculationTaskExecutionBody -from fast_api.routes.client_response import pack_json_result -from fastapi import APIRouter, Body, Depends, Query, Request -from submodules.model.enums import NotificationType -from submodules.model.util import sql_alchemy_to_dict -from util.notification import create_notification +from controller.attribute import manager as attribute_manager +from controller.tokenization import manager as tokenization_manager +from fast_api.models import ( + AttributeCalculationTaskExecutionBody, + TokenizationTaskExecutionBody, +) +from fastapi import APIRouter, status +from fastapi.responses import JSONResponse router = APIRouter() +SILENT_SUCCESS_RESPONSE = JSONResponse( + status_code=status.HTTP_200_OK, + content={"message": "Success"}, +) + + +@router.get("/ping") +def ping(): + return "pong" + @router.post( "/{project_id}/attribute-calculation", @@ -18,8 +27,20 @@ def calculate_attributes( project_id: str, attribute_calculation_task_execution: AttributeCalculationTaskExecutionBody, ): - manager.calculate_user_attribute_all_records( + attribute_manager.calculate_user_attribute_all_records( project_id, attribute_calculation_task_execution.user_id, attribute_calculation_task_execution.attribute_id, ) + return SILENT_SUCCESS_RESPONSE + + +@router.post( + "/{project_id}/tokenization", +) +def tokenization( + project_id: str, + tokenization_task_execution: TokenizationTaskExecutionBody, +): + # tokenization_manager + return SILENT_SUCCESS_RESPONSE diff --git a/start b/start index b92bef6b..7461c5ec 100755 --- a/start +++ b/start @@ -73,6 +73,7 @@ docker run -d --rm \ -e GATES=http://gates-gateway:80 \ -e COGNITION_GATEWAY=http://cognition-gateway:80 \ -e KRATOS_ADMIN_URL=http://kratos:4434 \ +-e TASK_MASTER=http://cognition-task-master:80 \ -e TASK_QUEUE_SLOTS=1 \ -e PRIORITY_TASK_QUEUE_SLOTS=1 \ -e INFERENCE_DIR=$INFERENCE_DIR \ From 7ab0a014b099c01e162f3e77ab824e2efe38bf18 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 09:50:09 +0200 Subject: [PATCH 06/28] information source endpoint --- fast_api/models.py | 7 +++++-- fast_api/routes/task_execution.py | 29 ++++++++++++++++++++++------- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/fast_api/models.py b/fast_api/models.py index d658a625..fab05249 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -443,5 +443,8 @@ class AttributeCalculationTaskExecutionBody(BaseModel): attribute_id: StrictStr -class TokenizationTaskExecutionBody(BaseModel): - pass +class InformationSourceTaskExecutionBody(BaseModel): + project_id: StrictStr + information_source_id: StrictStr + information_source_type: StrictStr + user_id: StrictStr diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index f7177ba8..ba1bb056 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -1,11 +1,14 @@ from controller.attribute import manager as attribute_manager -from controller.tokenization import manager as tokenization_manager +from controller.zero_shot import manager as zero_shot_manager +from controller.payload import manager as payload_manager from fast_api.models import ( AttributeCalculationTaskExecutionBody, - TokenizationTaskExecutionBody, + InformationSourceTaskExecutionBody, ) from fastapi import APIRouter, status from fastapi.responses import JSONResponse +from util import daemon +from submodules.model.enums import PayloadState, InformationSourceType router = APIRouter() @@ -27,20 +30,32 @@ def calculate_attributes( project_id: str, attribute_calculation_task_execution: AttributeCalculationTaskExecutionBody, ): - attribute_manager.calculate_user_attribute_all_records( + daemon.run( + attribute_manager.calculate_user_attribute_all_records, project_id, attribute_calculation_task_execution.user_id, attribute_calculation_task_execution.attribute_id, ) + return SILENT_SUCCESS_RESPONSE @router.post( - "/{project_id}/tokenization", + "/information-source", ) -def tokenization( +def information_source( project_id: str, - tokenization_task_execution: TokenizationTaskExecutionBody, + information_source_task_execution: InformationSourceTaskExecutionBody, ): - # tokenization_manager + project_id = information_source_task_execution.project_id + information_source_id = information_source_task_execution.information_source_id + information_source_type = information_source_task_execution.information_source_type + user_id = information_source_task_execution.user_id + if information_source_type == InformationSourceType.ZERO_SHOT.value: + zero_shot_manager.start_zero_shot_for_project_thread( + project_id, information_source_id, user_id + ) + else: + payload_manager.create_payload(project_id, information_source_id, user_id) + return SILENT_SUCCESS_RESPONSE From d2ffe0832c68671c2e124dadf318798c5c239b29 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 10:12:03 +0200 Subject: [PATCH 07/28] return payload id --- fast_api/routes/task_execution.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index ba1bb056..d7bb4db9 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -5,18 +5,13 @@ AttributeCalculationTaskExecutionBody, InformationSourceTaskExecutionBody, ) -from fastapi import APIRouter, status -from fastapi.responses import JSONResponse +from fastapi import APIRouter from util import daemon -from submodules.model.enums import PayloadState, InformationSourceType +from submodules.model.enums import InformationSourceType +from fast_api.routes.client_response import pack_json_result, SILENT_SUCCESS_RESPONSE router = APIRouter() -SILENT_SUCCESS_RESPONSE = JSONResponse( - status_code=status.HTTP_200_OK, - content={"message": "Success"}, -) - @router.get("/ping") def ping(): @@ -51,11 +46,14 @@ def information_source( information_source_id = information_source_task_execution.information_source_id information_source_type = information_source_task_execution.information_source_type user_id = information_source_task_execution.user_id + # already threaded in managers if information_source_type == InformationSourceType.ZERO_SHOT.value: - zero_shot_manager.start_zero_shot_for_project_thread( + payload_id = zero_shot_manager.start_zero_shot_for_project_thread( project_id, information_source_id, user_id ) else: - payload_manager.create_payload(project_id, information_source_id, user_id) + payload_id = payload_manager.create_payload( + project_id, information_source_id, user_id + ) - return SILENT_SUCCESS_RESPONSE + return pack_json_result({"payload_id": payload_id}) From 4a91bdd77ff791a8b87234469cf6d72109c2c372 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 10:50:33 +0200 Subject: [PATCH 08/28] information source, project id handling --- fast_api/routes/heuristic.py | 10 ++++++---- submodules/model | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/fast_api/routes/heuristic.py b/fast_api/routes/heuristic.py index 7a505ca0..0e55dde1 100644 --- a/fast_api/routes/heuristic.py +++ b/fast_api/routes/heuristic.py @@ -11,7 +11,7 @@ from submodules.model.enums import InformationSourceType from submodules.model.util import pack_edges_node, sql_alchemy_to_dict from util import notification -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model import enums from controller.auth import kratos @@ -187,6 +187,7 @@ def set_payload( heuristic_id: str, ): user = auth_manager.get_user_by_info(request.state.info) + org_id = user.organization_id information_source_item = information_source.get(project_id, heuristic_id) if information_source_item.type == enums.InformationSourceType.CROWD_LABELER.value: return pack_json_result({"data": {"createPayload": None}}) @@ -194,11 +195,12 @@ def set_payload( information_source_item.type != enums.InformationSourceType.ZERO_SHOT.value ) - queue_id, _ = task_queue_manager.add_task( - project_id, - enums.TaskType.INFORMATION_SOURCE, + queue_id, _ = task_master_manager.queue_task( + org_id, user.id, + enums.TaskType.INFORMATION_SOURCE, { + "project_id": project_id, "information_source_id": heuristic_id, "source_type": information_source_item.type, }, diff --git a/submodules/model b/submodules/model index 9bca0173..978e1bfd 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 9bca0173235d277c39009020e2ffd05187cb5521 +Subproject commit 978e1bfd6190afdf1e5432ad5101e05dff08f512 From abc0fbba91c630078de1b9e90f814e07794ded21 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 11:10:01 +0200 Subject: [PATCH 09/28] import wizard org ids --- .../transfer/cognition/import_wizard.py | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/controller/transfer/cognition/import_wizard.py b/controller/transfer/cognition/import_wizard.py index 67988510..afa38c76 100644 --- a/controller/transfer/cognition/import_wizard.py +++ b/controller/transfer/cognition/import_wizard.py @@ -23,7 +23,7 @@ from controller.labeling_task_label import manager as label_manager from controller.information_source import manager as information_source_manager from controller.attribute import manager as attribute_manager -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from controller.embedding import manager as embedding_manager from .bricks_loader import get_bricks_code_from_group, get_bricks_code_from_endpoint @@ -112,6 +112,7 @@ def __finalize_setup( __finalize_setup_for( CognitionProjects.REFERENCE, reference_project_id, + organization_id, user_id, project_language, file_additional_info, @@ -127,6 +128,7 @@ def __finalize_setup( __finalize_setup_for( CognitionProjects.QUESTION, question_project_id, + organization_id, user_id, project_language, file_additional_info, @@ -168,7 +170,7 @@ def __finalize_setup( organization_id=organization_id, ) - __add_start_gates_for(reference_project_id, task_list) + __add_start_gates_for(reference_project_id, organization_id, task_list) # currently disabled since not part of initial offering # __add_start_gates_for(question_project_id, task_list) @@ -176,6 +178,7 @@ def __finalize_setup( task_list.append( { + "organization_id": organization_id, "project_id": reference_project_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { @@ -207,8 +210,8 @@ def __finalize_setup( continue break - task_id, position = task_queue_manager.add_task( - reference_project_id, enums.TaskType.TASK_QUEUE, user_id, task_list + task_id, position = task_master_manager.queue_task( + organization_id, user_id, enums.TaskType.TASK_QUEUE, task_list ) notification.send_organization_update( @@ -270,7 +273,7 @@ def __add_websocket_message_queue_item( action["organization_id"] = organization_id task_list.append( { - "project_id": sender_project_id, + "organization_id": organization_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": action, } @@ -279,14 +282,16 @@ def __add_websocket_message_queue_item( def __add_weakly_supervise_all_valid( project_id: str, + org_id: str, task_list: List[Dict[str, str]], ) -> None: task_list.append( { - "project_id": project_id, + "organization_id": org_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { "action_type": enums.TaskQueueAction.RUN_WEAK_SUPERVISION.value, + "project_id": project_id, }, } ) @@ -294,11 +299,12 @@ def __add_weakly_supervise_all_valid( def __add_start_gates_for( project_id: str, + org_id: str, task_list: List[Dict[str, str]], ) -> None: task_list.append( { - "project_id": project_id, + "organization_id": org_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { "action_type": enums.TaskQueueAction.START_GATES.value, @@ -312,6 +318,7 @@ def __add_start_gates_for( def __finalize_setup_for( project_type: CognitionProjects, project_id: str, + org_id: str, user_id: str, project_language: str, file_additional_info: Dict[str, Any], @@ -333,6 +340,7 @@ def __finalize_setup_for( if bricks: __load_ac_from_bricks_group( project_id, + org_id, bricks["group"], bricks.get("type", "generator"), bricks.get("type_lookup", {}), @@ -355,6 +363,7 @@ def __finalize_setup_for( ) __create_attribute_with( project_id, + org_id, code, attribute["name"], attribute["type"], @@ -400,6 +409,7 @@ def __finalize_setup_for( project_id, labeling_task_id, user_id, + org_id, bricks["group"], bricks.get("type", "classifier"), target_data, @@ -451,7 +461,7 @@ def __finalize_setup_for( target_data, name_prefix=bricks.get("function_prefix"), ) - __add_weakly_supervise_all_valid(project_id, task_list) + __add_weakly_supervise_all_valid(project_id, org_id, task_list) token_ref.request_new() @@ -482,6 +492,7 @@ def __load_lf_from_bricks_group( target_project_id: str, target_task_id: str, user_id: str, + org_id: str, group_key: str, bricks_type: str, target_data: Dict[str, str], @@ -511,6 +522,7 @@ def __load_lf_from_bricks_group( if append_to_task_list: task_list.append( { + "organization_id": org_id, "project_id": target_project_id, "task_type": enums.TaskType.INFORMATION_SOURCE.value, "information_source_id": str(item.id), @@ -553,6 +565,7 @@ def __load_active_learner_from_bricks_group( def __load_ac_from_bricks_group( target_project_id: str, + org_id: str, group_key: str, bricks_type: str, data_type_lookup: Dict[str, str], @@ -577,6 +590,7 @@ def __load_ac_from_bricks_group( ) __create_attribute_with( target_project_id, + org_id, code, name, data_type, @@ -587,6 +601,7 @@ def __load_ac_from_bricks_group( def __add_embedding( target_project_id: str, + org_id: str, target_info: Dict[str, str], project_language: str, filter_columns: List[str], @@ -618,6 +633,7 @@ def __add_embedding( ) task_list.append( { + "organization_id": org_id, "project_id": target_project_id, "task_type": enums.TaskType.EMBEDDING.value, "embedding_type": target_embedding_type, @@ -635,6 +651,7 @@ def __add_embedding( if create_outlier_slice: task_list.append( { + "organization_id": org_id, "project_id": target_project_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { @@ -648,6 +665,7 @@ def __add_embedding( def __create_attribute_with( project_id: str, + org_id: str, code: str, name: str, attribute_type: str, @@ -663,6 +681,7 @@ def __create_attribute_with( if append_to_task_list: task_list.append( { + "organization_id": org_id, "project_id": project_id, "task_type": enums.TaskType.ATTRIBUTE_CALCULATION.value, "attribute_id": attribute_id, From ca73f3e82a8338f1d5d500b9fb0bbf2e9d087716 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 11:40:40 +0200 Subject: [PATCH 10/28] project id tokenizer --- api/project.py | 15 ++++++---- controller/attribute/manager.py | 44 ++++++++++++++++++++++-------- controller/project/manager.py | 8 ++++-- controller/tokenization/manager.py | 11 ++++---- controller/transfer/manager.py | 17 +++++++----- fast_api/models.py | 1 + fast_api/routes/task_execution.py | 1 + 7 files changed, 66 insertions(+), 31 deletions(-) diff --git a/api/project.py b/api/project.py index 0dc5f2d5..6e0b1cbc 100644 --- a/api/project.py +++ b/api/project.py @@ -10,7 +10,7 @@ from submodules.model import events from util import doc_ock, notification, adapter -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope logging.basicConfig(level=logging.DEBUG) @@ -71,17 +71,22 @@ async def post(self, request_body) -> JSONResponse: adapter.check(data, project.id, user.id) project_manager.add_workflow_store_data_to_project( - user_id=user.id, project_id=project.id, file_name=name, data=data + user_id=user.id, + project_id=project.id, + org_id=project.organization_id, + file_name=name, + data=data, ) - task_queue_manager.add_task( - str(project.id), - TaskType.TOKENIZATION, + task_master_manager.queue_task( + str(organization.id), str(user.id), + TaskType.TOKENIZATION, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project.id), }, ) diff --git a/controller/attribute/manager.py b/controller/attribute/manager.py index c5a8ded8..5850b2ca 100644 --- a/controller/attribute/manager.py +++ b/controller/attribute/manager.py @@ -18,7 +18,7 @@ ) from util import daemon, notification -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType from . import util from sqlalchemy import sql @@ -147,7 +147,11 @@ def delete_attribute(project_id: str, attribute_id: str) -> None: def add_running_id( - user_id: str, project_id: str, attribute_name: str, for_retokenization: bool = True + user_id: str, + org_id: str, + project_id: str, + attribute_name: str, + for_retokenization: bool = True, ) -> None: if attribute.get_by_name(project_id, attribute_name): raise ValueError(f"attribute with name {attribute_name} already exists") @@ -155,35 +159,49 @@ def add_running_id( # added threading for session management because otherwise this can sometimes create a deadlock thread = daemon.prepare_thread( - __add_running_id, user_id, project_id, attribute_name, for_retokenization + __add_running_id, + user_id, + org_id, + project_id, + attribute_name, + for_retokenization, ) thread.start() thread.join() def __add_running_id( - user_id: str, project_id: str, attribute_name: str, for_retokenization: bool = True + user_id: str, + org_id: str, + project_id: str, + attribute_name: str, + for_retokenization: bool = True, ): session_token = general.get_ctx_token() attribute.add_running_id( project_id, attribute_name, for_retokenization, with_commit=True ) if for_retokenization: - task_queue_manager.add_task( - project_id, + task_master_manager.queue_task( + str(org_id), + str(user_id), TaskType.TOKENIZATION, - user_id, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project_id), }, ) general.remove_and_refresh_session(session_token) def calculate_user_attribute_all_records( - project_id: str, user_id: str, attribute_id: str, include_rats: bool = True + project_id: str, + org_id: str, + user_id: str, + attribute_id: str, + include_rats: bool = True, ) -> None: if attribute.get_all( project_id=project_id, state_filter=[AttributeState.RUNNING.value] @@ -231,6 +249,7 @@ def calculate_user_attribute_all_records( daemon.run( __calculate_user_attribute_all_records, project_id, + org_id, user_id, attribute_id, include_rats, @@ -238,7 +257,7 @@ def calculate_user_attribute_all_records( def __calculate_user_attribute_all_records( - project_id: str, user_id: str, attribute_id: str, include_rats: bool + project_id: str, org_id: str, user_id: str, attribute_id: str, include_rats: bool ) -> None: session_token = general.get_ctx_token() try: @@ -295,14 +314,15 @@ def __calculate_user_attribute_all_records( project_id, attribute_id, "Triggering tokenization." ) try: - task_queue_manager.add_task( - project_id, + task_master_manager.queue_task( + str(org_id), TaskType.TOKENIZATION, - user_id, + str(user_id), { "scope": RecordTokenizationScope.ATTRIBUTE.value, "attribute_id": str(attribute_item.id), "include_rats": include_rats, + "project_id": str(project_id), }, ) diff --git a/controller/project/manager.py b/controller/project/manager.py index 0a168ac4..71c37bce 100644 --- a/controller/project/manager.py +++ b/controller/project/manager.py @@ -315,7 +315,11 @@ def __get_first_data_id(project_id: str, user_id: str, huddle_type: str) -> str: def add_workflow_store_data_to_project( - project_id: str, user_id: str, file_name: str, data: List[Dict[str, Any]] + project_id: str, + org_id: str, + user_id: str, + file_name: str, + data: List[Dict[str, Any]], ): upload_task = upload_task_manager.create_upload_task( user_id=user_id, @@ -332,7 +336,7 @@ def add_workflow_store_data_to_project( upload_task, enums.RecordCategory.SCALE.value, ) - check_and_add_running_id(project_id, user_id) + check_and_add_running_id(project_id, org_id, user_id) upload_task_manager.update_upload_task_to_finished(upload_task) diff --git a/controller/tokenization/manager.py b/controller/tokenization/manager.py index f16a1b9f..61cb4072 100644 --- a/controller/tokenization/manager.py +++ b/controller/tokenization/manager.py @@ -15,7 +15,7 @@ request_tokenize_record, ) import logging -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope logging.basicConfig(level=logging.INFO) @@ -91,15 +91,16 @@ def start_record_tokenization(project_id: str, record_id: str) -> None: ) -def start_project_tokenization(project_id: str, user_id: str) -> None: - task_queue_manager.add_task( - project_id, +def start_project_tokenization(project_id: str, org_id: str, user_id: str) -> None: + task_master_manager.queue_task( + str(org_id), + str(user_id), TaskType.TOKENIZATION, - user_id, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project_id), }, ) diff --git a/controller/transfer/manager.py b/controller/transfer/manager.py index ce7416b5..c5245364 100644 --- a/controller/transfer/manager.py +++ b/controller/transfer/manager.py @@ -40,7 +40,7 @@ from controller.labeling_task import manager as labeling_task_manager from controller.labeling_task_label import manager as labeling_task_label_manager from submodules.model.business_objects import record_label_association as rla -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope @@ -140,7 +140,7 @@ def import_records_from_json( os.remove(file_path) -def check_and_add_running_id(project_id: str, user_id: str): +def check_and_add_running_id(project_id: str, org_id: str, user_id: str): attributes = attribute.get_all(project_id) add_running_id = True for att in attributes: @@ -148,7 +148,9 @@ def check_and_add_running_id(project_id: str, user_id: str): add_running_id = False break if add_running_id: - attribute_manager.add_running_id(user_id, project_id, "running_id", False) + attribute_manager.add_running_id( + user_id, org_id, project_id, "running_id", False + ) def import_project(project_id: str, task: UploadTask) -> None: @@ -304,7 +306,7 @@ def generate_labelstudio_template( ) -def import_label_studio_file(project_id: str, upload_task_id: str) -> None: +def import_label_studio_file(project_id: str, org_id: str, upload_task_id: str) -> None: ctx_token = general.get_ctx_token() try: if attribute.get_all(project_id): @@ -312,14 +314,15 @@ def import_label_studio_file(project_id: str, upload_task_id: str) -> None: else: project_creation_manager.manage_data_import(project_id, upload_task_id) task = upload_task.get(project_id, upload_task_id) - task_queue_manager.add_task( - project_id, - TaskType.TOKENIZATION, + task_master_manager.queue_task( + str(org_id), str(task.user_id), + TaskType.TOKENIZATION, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project_id), }, ) upload_task.update( diff --git a/fast_api/models.py b/fast_api/models.py index fab05249..9c16a3be 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -441,6 +441,7 @@ class CancelTaskBody(BaseModel): class AttributeCalculationTaskExecutionBody(BaseModel): user_id: StrictStr attribute_id: StrictStr + organization_id: StrictStr class InformationSourceTaskExecutionBody(BaseModel): diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index d7bb4db9..8445048c 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -28,6 +28,7 @@ def calculate_attributes( daemon.run( attribute_manager.calculate_user_attribute_all_records, project_id, + attribute_calculation_task_execution.organization_id, attribute_calculation_task_execution.user_id, attribute_calculation_task_execution.attribute_id, ) From 2ec98282ab400c30319b605be3edce9b8a1ce713 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 14:03:43 +0200 Subject: [PATCH 11/28] attribute calculation, embedding --- controller/task_master/manager.py | 8 ++++++-- fast_api/models.py | 3 ++- fast_api/routes/embedding.py | 9 ++++++--- fast_api/routes/project_setting.py | 19 +++++++++++-------- fast_api/routes/task_execution.py | 6 ++---- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/controller/task_master/manager.py b/controller/task_master/manager.py index 90de2ac1..506e03f6 100644 --- a/controller/task_master/manager.py +++ b/controller/task_master/manager.py @@ -1,6 +1,5 @@ import requests import os -from submodules.model.business_objects import project as refinery_project from submodules.model import enums from typing import Any, Dict @@ -8,7 +7,11 @@ def queue_task( - org_id: str, user_id: str, task_type: enums.TaskType, task_info: Dict[str, Any] + org_id: str, + user_id: str, + task_type: enums.TaskType, + task_info: Dict[str, Any], + priority: bool = False, ) -> requests.Response: task_payload = { @@ -16,5 +19,6 @@ def queue_task( "userId": user_id, "taskType": task_type.value, "taskInfo": task_info, + "priority": priority, } return requests.put(f"{TASK_MASTER_URL}/task/queue", json=task_payload) diff --git a/fast_api/models.py b/fast_api/models.py index 9c16a3be..d2d9ae54 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -439,9 +439,10 @@ class CancelTaskBody(BaseModel): class AttributeCalculationTaskExecutionBody(BaseModel): + organization_id: StrictStr + project_id: StrictStr user_id: StrictStr attribute_id: StrictStr - organization_id: StrictStr class InformationSourceTaskExecutionBody(BaseModel): diff --git a/fast_api/routes/embedding.py b/fast_api/routes/embedding.py index 364cf0ee..7bd938a2 100644 --- a/fast_api/routes/embedding.py +++ b/fast_api/routes/embedding.py @@ -6,6 +6,7 @@ from fastapi import APIRouter, Body, Depends, Request from controller.embedding import manager from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from controller.auth import manager as auth_manager from controller.embedding.connector import collection_on_qdrant from submodules.model.business_objects import project @@ -147,6 +148,7 @@ def create_embedding( body: CreateEmbeddingBody = Body(...), ): user = auth_manager.get_user_by_info(request.state.info) + org_id = user.organization_id body.config = json.loads(body.config) embedding_type = body.config[ "embeddingType" @@ -166,11 +168,12 @@ def create_embedding( "version": body.config.get("version"), } - task_queue_manager.add_task( - project_id, - TaskType.EMBEDDING, + task_master_manager.queue_task( + org_id, user.id, + TaskType.EMBEDDING, { + "project_id": project_id, "embedding_type": embedding_type, "attribute_id": body.attribute_id, "embedding_name": manager.get_embedding_name( diff --git a/fast_api/routes/project_setting.py b/fast_api/routes/project_setting.py index c0a46b98..79f32cd7 100644 --- a/fast_api/routes/project_setting.py +++ b/fast_api/routes/project_setting.py @@ -11,7 +11,6 @@ from typing import Dict from fastapi.responses import JSONResponse -from controller.task_queue import manager from controller.auth import manager as auth_manager from controller.transfer import manager as transfer_manager from controller.attribute import manager as attribute_manager @@ -19,6 +18,8 @@ from controller.labeling_task import manager as task_manager from controller.project import manager as project_manager from controller.record import manager as record_manager +from controller.task_master import manager as task_master_manager +from controller.task_queue import manager as task_queue_manager from fast_api.routes.client_response import pack_json_result from submodules.model.enums import TaskType from submodules.model.util import sql_alchemy_to_dict @@ -55,7 +56,7 @@ def get_queued_tasks( project_id: str, task_type: str, ) -> Dict: - data = manager.get_all_waiting_by_type(project_id, task_type) + data = task_queue_manager.get_all_waiting_by_type(project_id, task_type) data_dict = sql_alchemy_to_dict(data, column_whitelist=QUEUED_TASKS_WHITELIST) return pack_json_result({"data": {"queuedTasks": data_dict}}) @@ -265,14 +266,16 @@ def calculate_user_attribute_all_records( project_id: str, body: CalculateUserAttributeAllRecordsBody = Body(...), ): - user_id = auth_manager.get_user_by_info(request.state.info).id - - manager.add_task( - project_id, + user = auth_manager.get_user_by_info(request.state.info) + user_id = user.id + org_id = user.organization_id + task_master_manager.queue_task( + str(org_id), + str(user_id), TaskType.ATTRIBUTE_CALCULATION, - user_id, { - "attribute_id": body.attribute_id, + "project_id": str(project_id), + "attribute_id": str(body.attribute_id), }, True, ) diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index 8445048c..be3a4d2e 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -19,15 +19,14 @@ def ping(): @router.post( - "/{project_id}/attribute-calculation", + "/attribute-calculation", ) def calculate_attributes( - project_id: str, attribute_calculation_task_execution: AttributeCalculationTaskExecutionBody, ): daemon.run( attribute_manager.calculate_user_attribute_all_records, - project_id, + attribute_calculation_task_execution.project_id, attribute_calculation_task_execution.organization_id, attribute_calculation_task_execution.user_id, attribute_calculation_task_execution.attribute_id, @@ -40,7 +39,6 @@ def calculate_attributes( "/information-source", ) def information_source( - project_id: str, information_source_task_execution: InformationSourceTaskExecutionBody, ): project_id = information_source_task_execution.project_id From 9097b80021e76928d6f219106478e2d1a3231fa1 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 15:01:49 +0200 Subject: [PATCH 12/28] embedding --- controller/task_master/manager.py | 6 ++++++ fast_api/routes/embedding.py | 10 ++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/controller/task_master/manager.py b/controller/task_master/manager.py index 506e03f6..4fa5216d 100644 --- a/controller/task_master/manager.py +++ b/controller/task_master/manager.py @@ -22,3 +22,9 @@ def queue_task( "priority": priority, } return requests.put(f"{TASK_MASTER_URL}/task/queue", json=task_payload) + + +def delete_task(org_id: str, task_id: str) -> requests.Response: + return requests.delete( + f"{TASK_MASTER_URL}/task/queue", json={"orgId": org_id, "taskId": task_id} + ) diff --git a/fast_api/routes/embedding.py b/fast_api/routes/embedding.py index 7bd938a2..9f722f86 100644 --- a/fast_api/routes/embedding.py +++ b/fast_api/routes/embedding.py @@ -5,7 +5,6 @@ from controller.misc import manager as misc from fastapi import APIRouter, Body, Depends, Request from controller.embedding import manager -from controller.task_queue import manager as task_queue_manager from controller.task_master import manager as task_master_manager from controller.auth import manager as auth_manager from controller.embedding.connector import collection_on_qdrant @@ -115,10 +114,10 @@ def get_embeddings(project_id: str) -> List: ) def delete_from_task_queue( request: Request, - project_id: str, task_id: str, ): - task_queue_manager.remove_task_from_queue(project_id, task_id) + org_id = auth_manager.get_user_by_info(request.state.info).organization_id + task_master_manager.delete_task(org_id, task_id) return pack_json_result({"data": {"deleteFromTaskQueue": {"ok": True}}}) @@ -148,7 +147,6 @@ def create_embedding( body: CreateEmbeddingBody = Body(...), ): user = auth_manager.get_user_by_info(request.state.info) - org_id = user.organization_id body.config = json.loads(body.config) embedding_type = body.config[ "embeddingType" @@ -169,8 +167,8 @@ def create_embedding( } task_master_manager.queue_task( - org_id, - user.id, + str(user.organization_id), + str(user.id), TaskType.EMBEDDING, { "project_id": project_id, From 8d81ae0daab6816f209148a10d08872258f683df Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 18:47:45 +0200 Subject: [PATCH 13/28] data slice --- controller/transfer/cognition/minio_upload.py | 21 ++++------- fast_api/models.py | 6 ++++ fast_api/routes/task_execution.py | 36 +++++++++++++++++++ 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 4229f5ae..8eb5bdc0 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -3,7 +3,7 @@ from submodules.model.business_objects import project from submodules.model.cognition_objects import conversation from submodules.model.enums import TaskType -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager def handle_cognition_file_upload(path_parts: List[str]): @@ -18,26 +18,17 @@ def handle_cognition_file_upload(path_parts: List[str]): if not cognition_prj: return - project_id = None - if cognition_prj.refinery_references_project_id: - project_id = str(cognition_prj.refinery_references_project_id) - else: - project_id = str( - project.get_or_create_queue_project( - cognition_prj.organization_id, cognition_prj.created_by, True - ).id - ) conversation_item = conversation.get(cognition_project_id, conversation_id) if not conversation_item: return - task_queue_manager.add_task( - project_id, + task_master_manager.queue_task( + str(cognition_prj.organization_id), + str(conversation_item.created_by), TaskType.PARSE_COGNITION_TMP_FILE, - conversation_item.created_by, { - "cognition_project_id": cognition_project_id, - "conversation_id": conversation_id, + "cognition_project_id": str(cognition_project_id), + "conversation_id": str(conversation_id), "minio_path": "/".join(path_parts[1:]), "bucket": path_parts[0], }, diff --git a/fast_api/models.py b/fast_api/models.py index d2d9ae54..aba92e5a 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -450,3 +450,9 @@ class InformationSourceTaskExecutionBody(BaseModel): information_source_id: StrictStr information_source_type: StrictStr user_id: StrictStr + + +class DataSliceActionExecutionBody(BaseModel): + project_id: StrictStr + user_id: StrictStr + embedding_id: StrictStr diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index be3a4d2e..2177d0d6 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -1,9 +1,11 @@ from controller.attribute import manager as attribute_manager from controller.zero_shot import manager as zero_shot_manager from controller.payload import manager as payload_manager +from controller.data_slice import manager as data_slice_manager from fast_api.models import ( AttributeCalculationTaskExecutionBody, InformationSourceTaskExecutionBody, + DataSliceActionExecutionBody, ) from fastapi import APIRouter from util import daemon @@ -56,3 +58,37 @@ def information_source( ) return pack_json_result({"payload_id": payload_id}) + + +@router.post( + "/data-slice", +) +def data_slice( + data_slice_action_execution: DataSliceActionExecutionBody, +): + project_id = data_slice_action_execution.project_id + embedding_id = data_slice_action_execution.embedding_id + user_id = data_slice_action_execution.user_id + + daemon.run( + data_slice_manager.create_outlier_slice, project_id, user_id, str(embedding_id) + ) + + return SILENT_SUCCESS_RESPONSE + + +@router.post( + "/start-gates", +) +def start_gates( + start_gates_action_execution: StartGatesActionExecutionBody, +): + project_id = data_slice_action_execution.project_id + embedding_id = data_slice_action_execution.embedding_id + user_id = data_slice_action_execution.user_id + + daemon.run( + data_slice_manager.create_outlier_slice, project_id, user_id, str(embedding_id) + ) + + return SILENT_SUCCESS_RESPONSE From c8c686f0f7ad1c54f5786932dbb31672dde3906e Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 6 Aug 2024 18:48:21 +0200 Subject: [PATCH 14/28] gate removal --- fast_api/routes/task_execution.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index 2177d0d6..140266c3 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -75,20 +75,3 @@ def data_slice( ) return SILENT_SUCCESS_RESPONSE - - -@router.post( - "/start-gates", -) -def start_gates( - start_gates_action_execution: StartGatesActionExecutionBody, -): - project_id = data_slice_action_execution.project_id - embedding_id = data_slice_action_execution.embedding_id - user_id = data_slice_action_execution.user_id - - daemon.run( - data_slice_manager.create_outlier_slice, project_id, user_id, str(embedding_id) - ) - - return SILENT_SUCCESS_RESPONSE From ae9467f17146b06be3b58c2c234b05398c022808 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Wed, 7 Aug 2024 11:54:08 +0200 Subject: [PATCH 15/28] remove start gates --- controller/gates/manager.py | 31 ------------------------------- fast_api/routes/task_execution.py | 5 ----- 2 files changed, 36 deletions(-) delete mode 100644 controller/gates/manager.py diff --git a/controller/gates/manager.py b/controller/gates/manager.py deleted file mode 100644 index 326d992f..00000000 --- a/controller/gates/manager.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import List, Tuple - -from submodules.model.business_objects import ( - information_source as refinery_information_source, - embedding as refinery_embedding, -) -from submodules.model.enums import PayloadState, EmbeddingState - - -from . import gates_service - - -def start_gates_container( - project_id: str, select_available_heuristics: bool = True -) -> bool: - heuristics, embeddings = [], [] - if select_available_heuristics: - heuristics, embeddings = __get_relevant_gates_ids(project_id) - return gates_service.start_gates_project(project_id, heuristics, embeddings) - - -def __get_relevant_gates_ids(project_id: str) -> Tuple[List[str], List[str]]: - embeddings = [ - str(e.id) - for e in refinery_embedding.get_all_embeddings_by_project_id(project_id) - if e.state == EmbeddingState.FINISHED.value - ] - - states = refinery_information_source.get_all_states(project_id) - heuristics = [key for key in states if states[key] == PayloadState.FINISHED.value] - return heuristics, embeddings diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index 140266c3..179ad9a6 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -15,11 +15,6 @@ router = APIRouter() -@router.get("/ping") -def ping(): - return "pong" - - @router.post( "/attribute-calculation", ) From 255e2d6b8cc955021f3f8e49b81528b405080960 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Wed, 7 Aug 2024 15:19:35 +0200 Subject: [PATCH 16/28] task executions, wizard improvements --- .../transfer/cognition/import_wizard.py | 15 ++++++++++++-- controller/transfer/cognition/minio_upload.py | 1 - fast_api/models.py | 5 +++++ fast_api/routes/task_execution.py | 20 +++++++++++++++++++ fast_api/routes/zero_shot.py | 12 ++++++----- 5 files changed, 45 insertions(+), 8 deletions(-) diff --git a/controller/transfer/cognition/import_wizard.py b/controller/transfer/cognition/import_wizard.py index afa38c76..7ac828ec 100644 --- a/controller/transfer/cognition/import_wizard.py +++ b/controller/transfer/cognition/import_wizard.py @@ -34,6 +34,7 @@ FREE_API_REQUEST_URL, ) from .util import send_log_message +import traceback class TokenRef: @@ -53,6 +54,7 @@ def prepare_and_finalize_setup(cognition_project_id: str, task_id: str) -> None: __finalize_setup(token_ref, cognition_project_id, task_id) except Exception as e: print(f"Error during wizard setup: {str(e)}", flush=True) + print(traceback.format_exc()) finally: token_ref.cleanup() @@ -158,6 +160,7 @@ def __finalize_setup( __finalize_setup_for( CognitionProjects.RELEVANCE, relevance_project_id, + organization_id, user_id, project_language, file_additional_info, @@ -210,9 +213,16 @@ def __finalize_setup( continue break - task_id, position = task_master_manager.queue_task( - organization_id, user_id, enums.TaskType.TASK_QUEUE, task_list + queue_response = task_master_manager.queue_task( + organization_id, + user_id, + enums.TaskType.TASK_QUEUE, + {"project_id": cognition_project_id, "task_list": task_list}, ) + queue_info = queue_response.json() + print(queue_info, flush=True) + task_id = queue_info["task_id"] + position = queue_info.get("position") notification.send_organization_update( cognition_project_id, @@ -435,6 +445,7 @@ def __finalize_setup_for( filter_columns = [] embedding_name = __add_embedding( project_id, + org_id, embedding.get("target", {}), project_language, filter_columns, diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 8eb5bdc0..60b9827b 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -1,6 +1,5 @@ from typing import List from submodules.model.cognition_objects import project as cognition_project -from submodules.model.business_objects import project from submodules.model.cognition_objects import conversation from submodules.model.enums import TaskType from controller.task_master import manager as task_master_manager diff --git a/fast_api/models.py b/fast_api/models.py index aba92e5a..e8c1dd1e 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -456,3 +456,8 @@ class DataSliceActionExecutionBody(BaseModel): project_id: StrictStr user_id: StrictStr embedding_id: StrictStr + + +class WeakSupervisionActionExecutionBody(BaseModel): + project_id: StrictStr + user_id: StrictStr diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index 179ad9a6..ea423771 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -2,10 +2,12 @@ from controller.zero_shot import manager as zero_shot_manager from controller.payload import manager as payload_manager from controller.data_slice import manager as data_slice_manager +from controller.weak_supervision import manager as weak_supervision_manager from fast_api.models import ( AttributeCalculationTaskExecutionBody, InformationSourceTaskExecutionBody, DataSliceActionExecutionBody, + WeakSupervisionActionExecutionBody, ) from fastapi import APIRouter from util import daemon @@ -70,3 +72,21 @@ def data_slice( ) return SILENT_SUCCESS_RESPONSE + + +@router.post( + "/weak-supervision", +) +def weak_supervision( + weak_supervision_action_execution: WeakSupervisionActionExecutionBody, +): + project_id = weak_supervision_action_execution.project_id + user_id = weak_supervision_action_execution.user_id + + daemon.run( + weak_supervision_manager.run_weak_supervision, + project_id, + user_id, + ) + + return SILENT_SUCCESS_RESPONSE diff --git a/fast_api/routes/zero_shot.py b/fast_api/routes/zero_shot.py index 724605f6..e188e69b 100644 --- a/fast_api/routes/zero_shot.py +++ b/fast_api/routes/zero_shot.py @@ -8,7 +8,7 @@ from fast_api.routes.client_response import pack_json_result from controller.auth import manager as auth_manager from fastapi import APIRouter, Body, Depends, Request -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from controller.zero_shot import manager as zero_shot_manager from submodules.model.enums import TaskType from util import notification @@ -98,12 +98,14 @@ def init_zeroshot( project_id: str, heuristic_id: str, ): - user_id = auth_manager.get_user_id_by_info(request.state.info) - task_queue_manager.add_task( - project_id, + user = auth_manager.get_user_by_info(request.state.info) + + task_master_manager.queue_task( + user.organization_id, + user.id, TaskType.INFORMATION_SOURCE, - user_id, { + "project_id,": project_id, "information_source_id": heuristic_id, }, ) From bd34f95c963f51607f976ca120d11fda97feb24f Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Wed, 7 Aug 2024 17:24:19 +0200 Subject: [PATCH 17/28] improve information source --- fast_api/routes/heuristic.py | 12 +++++++----- fast_api/routes/task_execution.py | 8 +++++--- submodules/model | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/fast_api/routes/heuristic.py b/fast_api/routes/heuristic.py index 0e55dde1..c1b71e0b 100644 --- a/fast_api/routes/heuristic.py +++ b/fast_api/routes/heuristic.py @@ -195,18 +195,20 @@ def set_payload( information_source_item.type != enums.InformationSourceType.ZERO_SHOT.value ) - queue_id, _ = task_master_manager.queue_task( - org_id, - user.id, + task_master_response = task_master_manager.queue_task( + str(org_id), + str(user.id), enums.TaskType.INFORMATION_SOURCE, { - "project_id": project_id, - "information_source_id": heuristic_id, + "project_id": str(project_id), + "information_source_id": str(heuristic_id), "source_type": information_source_item.type, }, priority=priority, ) + queue_id = task_master_response.json().get("task_id") + return pack_json_result({"data": {"createPayload": {"queueId": queue_id}}}) diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index ea423771..bb17464a 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -44,17 +44,19 @@ def information_source( information_source_id = information_source_task_execution.information_source_id information_source_type = information_source_task_execution.information_source_type user_id = information_source_task_execution.user_id + payload_id = None # already threaded in managers if information_source_type == InformationSourceType.ZERO_SHOT.value: payload_id = zero_shot_manager.start_zero_shot_for_project_thread( project_id, information_source_id, user_id ) else: - payload_id = payload_manager.create_payload( + payload = payload_manager.create_payload( project_id, information_source_id, user_id ) - - return pack_json_result({"payload_id": payload_id}) + if payload: + payload_id = payload.id + return pack_json_result({"payload_id": str(payload_id)}, wrap_for_frontend=False) @router.post( diff --git a/submodules/model b/submodules/model index 978e1bfd..aaa1583c 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 978e1bfd6190afdf1e5432ad5101e05dff08f512 +Subproject commit aaa1583c784979dd63a0588c5def6096304970d3 From 25d6a81c90244b297277821de6adf0f5301f8918 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Wed, 7 Aug 2024 18:47:39 +0200 Subject: [PATCH 18/28] clean --- api/transfer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/transfer.py b/api/transfer.py index 147881cc..77088b8d 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -1,7 +1,7 @@ import logging import traceback import time -from typing import Optional, Dict +from typing import Optional from starlette.endpoints import HTTPEndpoint from starlette.responses import PlainTextResponse, JSONResponse from controller.embedding.manager import recreate_embeddings From b7c541d644df4f7ad5af7c6c1f12228b1b2ee7b0 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Wed, 7 Aug 2024 20:06:11 +0200 Subject: [PATCH 19/28] project id --- controller/transfer/cognition/import_wizard.py | 1 + 1 file changed, 1 insertion(+) diff --git a/controller/transfer/cognition/import_wizard.py b/controller/transfer/cognition/import_wizard.py index 7ac828ec..4df4f01c 100644 --- a/controller/transfer/cognition/import_wizard.py +++ b/controller/transfer/cognition/import_wizard.py @@ -668,6 +668,7 @@ def __add_embedding( "action": { "action_type": enums.TaskQueueAction.CREATE_OUTLIER_SLICE.value, "embedding_name": embedding_name, + "project_id": target_project_id, }, } ) From 0a08a62ae653db54b362a1b512158e41ab135bfa Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 8 Aug 2024 11:41:09 +0200 Subject: [PATCH 20/28] model --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 663c7af4..912e0728 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 663c7af4ca06295aeefd2b19a37e60c6467c7986 +Subproject commit 912e07285af479d142892f8738b56884d5277be5 From cdfb760f2d49ff4014a61faa89b43f46fe68d11e Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 8 Aug 2024 12:13:33 +0200 Subject: [PATCH 21/28] task deletion --- controller/task_master/manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/controller/task_master/manager.py b/controller/task_master/manager.py index 4fa5216d..de003783 100644 --- a/controller/task_master/manager.py +++ b/controller/task_master/manager.py @@ -26,5 +26,6 @@ def queue_task( def delete_task(org_id: str, task_id: str) -> requests.Response: return requests.delete( - f"{TASK_MASTER_URL}/task/queue", json={"orgId": org_id, "taskId": task_id} + f"{TASK_MASTER_URL}/task/queue", + json={"orgId": str(org_id), "taskId": str(task_id)}, ) From 9f0c1875008564daab1bc6d7e4abea5b08ce74a6 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 8 Aug 2024 14:58:19 +0200 Subject: [PATCH 22/28] replace alembic + model --- .../881102ae15f8_rework_task_queue.py | 61 +++++++++++++++++++ ...5c30d91685_remove_project_id_task_queue.py | 51 ---------------- submodules/model | 2 +- 3 files changed, 62 insertions(+), 52 deletions(-) create mode 100644 alembic/versions/881102ae15f8_rework_task_queue.py delete mode 100644 alembic/versions/af5c30d91685_remove_project_id_task_queue.py diff --git a/alembic/versions/881102ae15f8_rework_task_queue.py b/alembic/versions/881102ae15f8_rework_task_queue.py new file mode 100644 index 00000000..b3dd6830 --- /dev/null +++ b/alembic/versions/881102ae15f8_rework_task_queue.py @@ -0,0 +1,61 @@ +"""rework task queue + +Revision ID: 881102ae15f8 +Revises: 37d138040614 +Create Date: 2024-08-08 12:57:27.648167 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '881102ae15f8' +down_revision = '37d138040614' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('task_queue', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('task_type', sa.String(), nullable=True), + sa.Column('task_info', sa.JSON(), nullable=True), + sa.Column('priority', sa.Boolean(), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True), + sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + schema='global' + ) + op.create_index(op.f('ix_global_task_queue_created_by'), 'task_queue', ['created_by'], unique=False, schema='global') + op.create_index(op.f('ix_global_task_queue_organization_id'), 'task_queue', ['organization_id'], unique=False, schema='global') + op.drop_index('ix_task_queue_created_by', table_name='task_queue') + op.drop_table('task_queue') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('task_queue', + sa.Column('id', postgresql.UUID(), autoincrement=False, nullable=False), + sa.Column('project_id', postgresql.UUID(), autoincrement=False, nullable=True), + sa.Column('task_type', sa.VARCHAR(), autoincrement=False, nullable=True), + sa.Column('task_info', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True), + sa.Column('priority', sa.BOOLEAN(), autoincrement=False, nullable=True), + sa.Column('is_active', sa.BOOLEAN(), autoincrement=False, nullable=True), + sa.Column('created_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True), + sa.Column('created_by', postgresql.UUID(), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint(['created_by'], ['user.id'], name='task_queue_created_by_fkey', ondelete='SET NULL'), + sa.ForeignKeyConstraint(['project_id'], ['project.id'], name='task_queue_project_id_fkey', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name='task_queue_pkey') + ) + op.create_index('ix_task_queue_created_by', 'task_queue', ['created_by'], unique=False) + op.drop_index(op.f('ix_global_task_queue_organization_id'), table_name='task_queue', schema='global') + op.drop_index(op.f('ix_global_task_queue_created_by'), table_name='task_queue', schema='global') + op.drop_table('task_queue', schema='global') + # ### end Alembic commands ### diff --git a/alembic/versions/af5c30d91685_remove_project_id_task_queue.py b/alembic/versions/af5c30d91685_remove_project_id_task_queue.py deleted file mode 100644 index 17169c2e..00000000 --- a/alembic/versions/af5c30d91685_remove_project_id_task_queue.py +++ /dev/null @@ -1,51 +0,0 @@ -"""remove project id task queue - -Revision ID: af5c30d91685 -Revises: 0d587af700ce -Create Date: 2024-08-04 15:42:26.747671 - -""" -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import postgresql - -# revision identifiers, used by Alembic. -revision = 'af5c30d91685' -down_revision = '0d587af700ce' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('macro_execution_summary', - sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), - sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True), - sa.Column('creation_month', sa.Date(), nullable=True), - sa.Column('macro_type', sa.String(), nullable=True), - sa.Column('execution_count', sa.Integer(), nullable=True), - sa.Column('processed_files_count', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('organization_id', 'creation_month', 'macro_type', name='unique_macro_summary'), - schema='cognition' - ) - op.create_index(op.f('ix_cognition_macro_execution_summary_creation_month'), 'macro_execution_summary', ['creation_month'], unique=False, schema='cognition') - op.create_index(op.f('ix_cognition_macro_execution_summary_organization_id'), 'macro_execution_summary', ['organization_id'], unique=False, schema='cognition') - op.add_column('task_queue', sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True)) - op.drop_constraint('task_queue_project_id_fkey', 'task_queue', type_='foreignkey') - op.create_foreign_key(None, 'task_queue', 'organization', ['organization_id'], ['id'], ondelete='CASCADE') - op.drop_column('task_queue', 'project_id') - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.add_column('task_queue', sa.Column('project_id', postgresql.UUID(), autoincrement=False, nullable=True)) - op.drop_constraint(None, 'task_queue', type_='foreignkey') - op.create_foreign_key('task_queue_project_id_fkey', 'task_queue', 'project', ['project_id'], ['id'], ondelete='CASCADE') - op.drop_column('task_queue', 'organization_id') - op.drop_index(op.f('ix_cognition_macro_execution_summary_organization_id'), table_name='macro_execution_summary', schema='cognition') - op.drop_index(op.f('ix_cognition_macro_execution_summary_creation_month'), table_name='macro_execution_summary', schema='cognition') - op.drop_table('macro_execution_summary', schema='cognition') - # ### end Alembic commands ### diff --git a/submodules/model b/submodules/model index 912e0728..f258447e 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 912e07285af479d142892f8738b56884d5277be5 +Subproject commit f258447e1c37a158d3dddcae3428e336a2cafd55 From 152b05ac5c7d09887f2c3523ad23955be3ff3735 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 12 Aug 2024 10:43:29 +0200 Subject: [PATCH 23/28] PR updates --- api/transfer.py | 2 +- controller/information_source/manager.py | 3 -- .../transfer/cognition/import_wizard.py | 22 ++++++------- fast_api/routes/heuristic.py | 5 +-- fast_api/routes/task_execution.py | 33 ++++++++++--------- start | 2 -- submodules/model | 2 +- 7 files changed, 33 insertions(+), 36 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 77088b8d..489186ee 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -324,7 +324,7 @@ def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) if task.file_type != "knowledge_base": only_usable_attributes = task.file_type == "records_add" project_item = project.get(project_id) - org_id = str(project_item.organization_id) + org_id = project_item.organization_id task_master_manager.queue_task( str(org_id), str(task.user_id), diff --git a/controller/information_source/manager.py b/controller/information_source/manager.py index 5a880a97..1fd72691 100644 --- a/controller/information_source/manager.py +++ b/controller/information_source/manager.py @@ -4,7 +4,6 @@ from controller.information_source.util import resolve_source_return_type from submodules.model import InformationSource, LabelingTask, enums from submodules.model.business_objects import ( - general, labeling_task, information_source, payload, @@ -12,7 +11,6 @@ from controller.misc import config_service from controller.labeling_access_link import manager as link_manager from controller.record_label_association import manager as rla_manager -from controller.payload import manager as payload_manager from util import daemon @@ -169,4 +167,3 @@ def set_all_model_callbacks_selected(project_id: str, value: bool) -> None: information_source.update_is_selected_for_project( project_id, value, with_commit=True, is_model_callback=True ) - diff --git a/controller/transfer/cognition/import_wizard.py b/controller/transfer/cognition/import_wizard.py index 4df4f01c..7a12067f 100644 --- a/controller/transfer/cognition/import_wizard.py +++ b/controller/transfer/cognition/import_wizard.py @@ -219,22 +219,22 @@ def __finalize_setup( enums.TaskType.TASK_QUEUE, {"project_id": cognition_project_id, "task_list": task_list}, ) - queue_info = queue_response.json() - print(queue_info, flush=True) - task_id = queue_info["task_id"] - position = queue_info.get("position") + if queue_response.ok: + queue_info = queue_response.json() + task_id = queue_info["task_id"] + position = queue_info.get("position") - notification.send_organization_update( - cognition_project_id, - f"cognition_wizard:task_queue:{task_id}:{len(task_list)}", - organization_id=organization_id, - ) - if position: notification.send_organization_update( cognition_project_id, - f"task_queue:{str(task_id)}:QUEUE_POSITION:{position}", + f"cognition_wizard:task_queue:{task_id}:{len(task_list)}", organization_id=organization_id, ) + if position: + notification.send_organization_update( + cognition_project_id, + f"task_queue:{str(task_id)}:QUEUE_POSITION:{position}", + organization_id=organization_id, + ) # function called from queue as last entry diff --git a/fast_api/routes/heuristic.py b/fast_api/routes/heuristic.py index c1b71e0b..89131bbf 100644 --- a/fast_api/routes/heuristic.py +++ b/fast_api/routes/heuristic.py @@ -206,8 +206,9 @@ def set_payload( }, priority=priority, ) - - queue_id = task_master_response.json().get("task_id") + queue_id = None + if task_master_response.ok: + queue_id = task_master_response.json().get("task_id") return pack_json_result({"data": {"createPayload": {"queueId": queue_id}}}) diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py index bb17464a..0d75d927 100644 --- a/fast_api/routes/task_execution.py +++ b/fast_api/routes/task_execution.py @@ -40,19 +40,22 @@ def calculate_attributes( def information_source( information_source_task_execution: InformationSourceTaskExecutionBody, ): - project_id = information_source_task_execution.project_id - information_source_id = information_source_task_execution.information_source_id - information_source_type = information_source_task_execution.information_source_type - user_id = information_source_task_execution.user_id - payload_id = None + # already threaded in managers - if information_source_type == InformationSourceType.ZERO_SHOT.value: + if ( + information_source_task_execution.information_source_type + == InformationSourceType.ZERO_SHOT.value + ): payload_id = zero_shot_manager.start_zero_shot_for_project_thread( - project_id, information_source_id, user_id + information_source_task_execution.project_id, + information_source_task_execution.information_source_id, + information_source_task_execution.user_id, ) else: payload = payload_manager.create_payload( - project_id, information_source_id, user_id + information_source_task_execution.project_id, + information_source_task_execution.information_source_id, + information_source_task_execution.user_id, ) if payload: payload_id = payload.id @@ -65,12 +68,12 @@ def information_source( def data_slice( data_slice_action_execution: DataSliceActionExecutionBody, ): - project_id = data_slice_action_execution.project_id - embedding_id = data_slice_action_execution.embedding_id - user_id = data_slice_action_execution.user_id daemon.run( - data_slice_manager.create_outlier_slice, project_id, user_id, str(embedding_id) + data_slice_manager.create_outlier_slice, + data_slice_action_execution.project_id, + data_slice_action_execution.user_id, + data_slice_action_execution.embedding_id, ) return SILENT_SUCCESS_RESPONSE @@ -82,13 +85,11 @@ def data_slice( def weak_supervision( weak_supervision_action_execution: WeakSupervisionActionExecutionBody, ): - project_id = weak_supervision_action_execution.project_id - user_id = weak_supervision_action_execution.user_id daemon.run( weak_supervision_manager.run_weak_supervision, - project_id, - user_id, + weak_supervision_action_execution.project_id, + weak_supervision_action_execution.user_id, ) return SILENT_SUCCESS_RESPONSE diff --git a/start b/start index 7461c5ec..1bb726b3 100755 --- a/start +++ b/start @@ -74,8 +74,6 @@ docker run -d --rm \ -e COGNITION_GATEWAY=http://cognition-gateway:80 \ -e KRATOS_ADMIN_URL=http://kratos:4434 \ -e TASK_MASTER=http://cognition-task-master:80 \ --e TASK_QUEUE_SLOTS=1 \ --e PRIORITY_TASK_QUEUE_SLOTS=1 \ -e INFERENCE_DIR=$INFERENCE_DIR \ -e SECRET_KEY=default \ -e POSTGRES_POOL_USE_LIFO=x \ diff --git a/submodules/model b/submodules/model index f258447e..ea40d13f 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit f258447e1c37a158d3dddcae3428e336a2cafd55 +Subproject commit ea40d13f58c8db20e3d093351ec2c5d19558d9c5 From ed0d0cce0203e0ae67e474d63de22adcc24887aa Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 12 Aug 2024 11:07:28 +0200 Subject: [PATCH 24/28] blank lines --- controller/upload_task/manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/controller/upload_task/manager.py b/controller/upload_task/manager.py index a791f09f..688fa944 100644 --- a/controller/upload_task/manager.py +++ b/controller/upload_task/manager.py @@ -39,7 +39,7 @@ def create_upload_task( file_type: str, file_import_options: str, upload_type: str, - key: str + key: str, ) -> UploadTask: task = upload_task.create( user_id, @@ -61,7 +61,6 @@ def update_upload_task_to_finished(task: UploadTask) -> None: ) - def update_task( project_id: str, task_id: str, From af28a13afa76b2a788dffdc809056008c635898d Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 12 Aug 2024 11:30:43 +0200 Subject: [PATCH 25/28] clean --- controller/upload_task/manager.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/controller/upload_task/manager.py b/controller/upload_task/manager.py index 688fa944..c2994291 100644 --- a/controller/upload_task/manager.py +++ b/controller/upload_task/manager.py @@ -1,11 +1,10 @@ import logging -from typing import ByteString, Optional - +from typing import Optional from controller.transfer.util import ( get_upload_task_message as get_upload_task_message_orig, ) from submodules.model import UploadTask, enums -from submodules.model.business_objects import upload_task, general +from submodules.model.business_objects import upload_task from util import notification logger = logging.getLogger(__name__) From 99b203002efb1640efcc5063f5d550d11c5719f2 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 12 Aug 2024 11:41:47 +0200 Subject: [PATCH 26/28] model merge --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index ea40d13f..1babee7e 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit ea40d13f58c8db20e3d093351ec2c5d19558d9c5 +Subproject commit 1babee7ea7ca2621f05c6df3a397c6b9416baab7 From 573829d551dc3b100664ea47bc64b8b224e7776c Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 12 Aug 2024 16:33:20 +0200 Subject: [PATCH 27/28] test --- app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app.py b/app.py index 39e59486..95e99c8d 100644 --- a/app.py +++ b/app.py @@ -145,7 +145,7 @@ Mount("/api", app=fastapi_app, name="REST API"), Mount( "/internal/api", app=fastapi_app_internal, name="INTERNAL REST API" - ), # task master requesting + ), # task master requests ] From 1e8eef4d5090a33e3d51a6252f6ebcfd93f9b43d Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 12 Aug 2024 16:58:33 +0200 Subject: [PATCH 28/28] model dev --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 1babee7e..6a0b51a5 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 1babee7ea7ca2621f05c6df3a397c6b9416baab7 +Subproject commit 6a0b51a5e7cf88d58cb258a6c9014c4644626faa