diff --git a/alembic/versions/881102ae15f8_rework_task_queue.py b/alembic/versions/881102ae15f8_rework_task_queue.py new file mode 100644 index 00000000..b3dd6830 --- /dev/null +++ b/alembic/versions/881102ae15f8_rework_task_queue.py @@ -0,0 +1,61 @@ +"""rework task queue + +Revision ID: 881102ae15f8 +Revises: 37d138040614 +Create Date: 2024-08-08 12:57:27.648167 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '881102ae15f8' +down_revision = '37d138040614' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('task_queue', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('task_type', sa.String(), nullable=True), + sa.Column('task_info', sa.JSON(), nullable=True), + sa.Column('priority', sa.Boolean(), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True), + sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + schema='global' + ) + op.create_index(op.f('ix_global_task_queue_created_by'), 'task_queue', ['created_by'], unique=False, schema='global') + op.create_index(op.f('ix_global_task_queue_organization_id'), 'task_queue', ['organization_id'], unique=False, schema='global') + op.drop_index('ix_task_queue_created_by', table_name='task_queue') + op.drop_table('task_queue') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('task_queue', + sa.Column('id', postgresql.UUID(), autoincrement=False, nullable=False), + sa.Column('project_id', postgresql.UUID(), autoincrement=False, nullable=True), + sa.Column('task_type', sa.VARCHAR(), autoincrement=False, nullable=True), + sa.Column('task_info', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True), + sa.Column('priority', sa.BOOLEAN(), autoincrement=False, nullable=True), + sa.Column('is_active', sa.BOOLEAN(), autoincrement=False, nullable=True), + sa.Column('created_at', postgresql.TIMESTAMP(), autoincrement=False, nullable=True), + sa.Column('created_by', postgresql.UUID(), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint(['created_by'], ['user.id'], name='task_queue_created_by_fkey', ondelete='SET NULL'), + sa.ForeignKeyConstraint(['project_id'], ['project.id'], name='task_queue_project_id_fkey', ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name='task_queue_pkey') + ) + op.create_index('ix_task_queue_created_by', 'task_queue', ['created_by'], unique=False) + op.drop_index(op.f('ix_global_task_queue_organization_id'), table_name='task_queue', schema='global') + op.drop_index(op.f('ix_global_task_queue_created_by'), table_name='task_queue', schema='global') + op.drop_table('task_queue', schema='global') + # ### end Alembic commands ### diff --git a/api/project.py b/api/project.py index 0dc5f2d5..6e0b1cbc 100644 --- a/api/project.py +++ b/api/project.py @@ -10,7 +10,7 @@ from submodules.model import events from util import doc_ock, notification, adapter -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope logging.basicConfig(level=logging.DEBUG) @@ -71,17 +71,22 @@ async def post(self, request_body) -> JSONResponse: adapter.check(data, project.id, user.id) project_manager.add_workflow_store_data_to_project( - user_id=user.id, project_id=project.id, file_name=name, data=data + user_id=user.id, + project_id=project.id, + org_id=project.organization_id, + file_name=name, + data=data, ) - task_queue_manager.add_task( - str(project.id), - TaskType.TOKENIZATION, + task_master_manager.queue_task( + str(organization.id), str(user.id), + TaskType.TOKENIZATION, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project.id), }, ) diff --git a/api/transfer.py b/api/transfer.py index 0b3bc9f0..489186ee 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -1,7 +1,7 @@ import logging import traceback import time -from typing import Optional, Dict +from typing import Optional from starlette.endpoints import HTTPEndpoint from starlette.responses import PlainTextResponse, JSONResponse from controller.embedding.manager import recreate_embeddings @@ -18,12 +18,11 @@ general, organization, tokenization, - project as refinery_project, + project, ) from submodules.model.cognition_objects import ( project as cognition_project, - macro as macro_db_bo, ) from controller.transfer import manager as transfer_manager @@ -41,7 +40,7 @@ from util import daemon, notification from controller.transfer.cognition.minio_upload import handle_cognition_file_upload -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope @@ -243,86 +242,6 @@ def put(self, request) -> PlainTextResponse: return PlainTextResponse("OK") -class CognitionParseMarkdownFile(HTTPEndpoint): - def post(self, request) -> PlainTextResponse: - refinery_project_id = request.path_params["project_id"] - refinery_project_item = refinery_project.get(refinery_project_id) - if not refinery_project_item: - return PlainTextResponse("Bad project id", status_code=400) - - dataset_id = request.path_params["dataset_id"] - file_id = request.path_params["file_id"] - - # via thread to ensure the endpoint returns immediately - - daemon.run( - CognitionParseMarkdownFile.__add_parse_markdown_file_thread, - refinery_project_id, - str(refinery_project_item.created_by), - { - "org_id": str(refinery_project_item.organization_id), - "dataset_id": dataset_id, - "file_id": file_id, - }, - ) - - return PlainTextResponse("OK") - - def __add_parse_markdown_file_thread( - project_id: str, user_id: str, task_info: Dict[str, str] - ): - - ctx_token = general.get_ctx_token() - try: - task_queue_manager.add_task( - project_id, TaskType.PARSE_MARKDOWN_FILE, user_id, task_info - ) - finally: - general.remove_and_refresh_session(ctx_token, False) - - -class CognitionStartMacroExecutionGroup(HTTPEndpoint): - def put(self, request) -> PlainTextResponse: - macro_id = request.path_params["macro_id"] - group_id = request.path_params["group_id"] - - execution_entries = macro_db_bo.get_all_macro_executions(macro_id, group_id) - - if len(execution_entries) == 0: - return PlainTextResponse("No executions found", status_code=400) - if not (cognition_prj_id := execution_entries[0].meta_info.get("project_id")): - return PlainTextResponse("No project id found", status_code=400) - cognition_prj = cognition_project.get(cognition_prj_id) - refinery_prj_id = str( - refinery_project.get_or_create_queue_project( - cognition_prj.organization_id, cognition_prj.created_by, True - ).id - ) - cached = {str(e.id): str(e.created_by) for e in execution_entries} - - def queue_tasks(): - token = general.get_ctx_token() - try: - for exec_id in cached: - task_queue_manager.add_task( - refinery_prj_id, - TaskType.RUN_COGNITION_MACRO, - cached[exec_id], - { - "macro_id": macro_id, - "execution_id": exec_id, - "execution_group_id": group_id, - }, - ) - general.commit() - finally: - general.remove_and_refresh_session(token, False) - - daemon.run(queue_tasks) - - return PlainTextResponse("OK") - - class AssociationsImport(HTTPEndpoint): async def post(self, request) -> JSONResponse: project_id = request.path_params["project_id"] @@ -404,11 +323,14 @@ def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) ) if task.file_type != "knowledge_base": only_usable_attributes = task.file_type == "records_add" - task_queue_manager.add_task( - project_id, + project_item = project.get(project_id) + org_id = project_item.organization_id + task_master_manager.queue_task( + str(org_id), + str(task.user_id), TaskType.TOKENIZATION, - task.user_id, { + "project_id": str(project_id), "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": only_usable_attributes, diff --git a/app.py b/app.py index 57619b2c..95e99c8d 100644 --- a/app.py +++ b/app.py @@ -15,8 +15,6 @@ UploadTaskInfo, CognitionImport, CognitionPrepareProject, - CognitionParseMarkdownFile, - CognitionStartMacroExecutionGroup, ) from fast_api.routes.organization import router as org_router from fast_api.routes.project import router as project_router @@ -36,12 +34,12 @@ from fast_api.routes.record import router as record_router from fast_api.routes.weak_supervision import router as weak_supervision_router from fast_api.routes.labeling_tasks import router as labeling_tasks_router +from fast_api.routes.task_execution import router as task_execution_router from middleware.database_session import handle_db_session from middleware.starlette_tmp_middleware import DatabaseSessionHandler from starlette.applications import Starlette from starlette.routing import Route, Mount -from controller.task_queue.task_queue import init_task_queues from controller.project.manager import check_in_deletion_projects from route_prefix import ( PREFIX_ORGANIZATION, @@ -62,6 +60,7 @@ PREFIX_RECORD, PREFIX_WEAK_SUPERVISION, PREFIX_LABELING_TASKS, + PREFIX_TASK_EXECUTION, ) from util import security, clean_up from middleware import log_storage @@ -116,6 +115,10 @@ labeling_tasks_router, prefix=PREFIX_LABELING_TASKS, tags=["labeling-tasks"] ) +fastapi_app_internal = FastAPI() +fastapi_app_internal.include_router( + task_execution_router, prefix=PREFIX_TASK_EXECUTION, tags=["task-execution"] +) routes = [ Route("/notify/{path:path}", Notify), Route("/healthcheck", Healthcheck), @@ -135,19 +138,14 @@ "/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize", CognitionPrepareProject, ), - Route( - "/project/{project_id:str}/cognition/datasets/{dataset_id:str}/files/{file_id:str}/queue", - CognitionParseMarkdownFile, - ), Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo), Route("/project", ProjectCreationFromWorkflow), - Route( - "/macro/{macro_id:str}/execution-group/{group_id:str}/queue", - CognitionStartMacroExecutionGroup, - ), Route("/is_managed", IsManagedRest), Route("/is_demo", IsDemoRest), Mount("/api", app=fastapi_app, name="REST API"), + Mount( + "/internal/api", app=fastapi_app_internal, name="INTERNAL REST API" + ), # task master requests ] @@ -156,7 +154,6 @@ middleware = [Middleware(DatabaseSessionHandler)] app = Starlette(routes=routes, middleware=middleware) -init_task_queues() check_in_deletion_projects() security.check_secret_key() clean_up.clean_up_database() diff --git a/controller/attribute/manager.py b/controller/attribute/manager.py index c5a8ded8..5850b2ca 100644 --- a/controller/attribute/manager.py +++ b/controller/attribute/manager.py @@ -18,7 +18,7 @@ ) from util import daemon, notification -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType from . import util from sqlalchemy import sql @@ -147,7 +147,11 @@ def delete_attribute(project_id: str, attribute_id: str) -> None: def add_running_id( - user_id: str, project_id: str, attribute_name: str, for_retokenization: bool = True + user_id: str, + org_id: str, + project_id: str, + attribute_name: str, + for_retokenization: bool = True, ) -> None: if attribute.get_by_name(project_id, attribute_name): raise ValueError(f"attribute with name {attribute_name} already exists") @@ -155,35 +159,49 @@ def add_running_id( # added threading for session management because otherwise this can sometimes create a deadlock thread = daemon.prepare_thread( - __add_running_id, user_id, project_id, attribute_name, for_retokenization + __add_running_id, + user_id, + org_id, + project_id, + attribute_name, + for_retokenization, ) thread.start() thread.join() def __add_running_id( - user_id: str, project_id: str, attribute_name: str, for_retokenization: bool = True + user_id: str, + org_id: str, + project_id: str, + attribute_name: str, + for_retokenization: bool = True, ): session_token = general.get_ctx_token() attribute.add_running_id( project_id, attribute_name, for_retokenization, with_commit=True ) if for_retokenization: - task_queue_manager.add_task( - project_id, + task_master_manager.queue_task( + str(org_id), + str(user_id), TaskType.TOKENIZATION, - user_id, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project_id), }, ) general.remove_and_refresh_session(session_token) def calculate_user_attribute_all_records( - project_id: str, user_id: str, attribute_id: str, include_rats: bool = True + project_id: str, + org_id: str, + user_id: str, + attribute_id: str, + include_rats: bool = True, ) -> None: if attribute.get_all( project_id=project_id, state_filter=[AttributeState.RUNNING.value] @@ -231,6 +249,7 @@ def calculate_user_attribute_all_records( daemon.run( __calculate_user_attribute_all_records, project_id, + org_id, user_id, attribute_id, include_rats, @@ -238,7 +257,7 @@ def calculate_user_attribute_all_records( def __calculate_user_attribute_all_records( - project_id: str, user_id: str, attribute_id: str, include_rats: bool + project_id: str, org_id: str, user_id: str, attribute_id: str, include_rats: bool ) -> None: session_token = general.get_ctx_token() try: @@ -295,14 +314,15 @@ def __calculate_user_attribute_all_records( project_id, attribute_id, "Triggering tokenization." ) try: - task_queue_manager.add_task( - project_id, + task_master_manager.queue_task( + str(org_id), TaskType.TOKENIZATION, - user_id, + str(user_id), { "scope": RecordTokenizationScope.ATTRIBUTE.value, "attribute_id": str(attribute_item.id), "include_rats": include_rats, + "project_id": str(project_id), }, ) diff --git a/controller/gates/manager.py b/controller/gates/manager.py deleted file mode 100644 index 326d992f..00000000 --- a/controller/gates/manager.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import List, Tuple - -from submodules.model.business_objects import ( - information_source as refinery_information_source, - embedding as refinery_embedding, -) -from submodules.model.enums import PayloadState, EmbeddingState - - -from . import gates_service - - -def start_gates_container( - project_id: str, select_available_heuristics: bool = True -) -> bool: - heuristics, embeddings = [], [] - if select_available_heuristics: - heuristics, embeddings = __get_relevant_gates_ids(project_id) - return gates_service.start_gates_project(project_id, heuristics, embeddings) - - -def __get_relevant_gates_ids(project_id: str) -> Tuple[List[str], List[str]]: - embeddings = [ - str(e.id) - for e in refinery_embedding.get_all_embeddings_by_project_id(project_id) - if e.state == EmbeddingState.FINISHED.value - ] - - states = refinery_information_source.get_all_states(project_id) - heuristics = [key for key in states if states[key] == PayloadState.FINISHED.value] - return heuristics, embeddings diff --git a/controller/information_source/manager.py b/controller/information_source/manager.py index 5a880a97..1fd72691 100644 --- a/controller/information_source/manager.py +++ b/controller/information_source/manager.py @@ -4,7 +4,6 @@ from controller.information_source.util import resolve_source_return_type from submodules.model import InformationSource, LabelingTask, enums from submodules.model.business_objects import ( - general, labeling_task, information_source, payload, @@ -12,7 +11,6 @@ from controller.misc import config_service from controller.labeling_access_link import manager as link_manager from controller.record_label_association import manager as rla_manager -from controller.payload import manager as payload_manager from util import daemon @@ -169,4 +167,3 @@ def set_all_model_callbacks_selected(project_id: str, value: bool) -> None: information_source.update_is_selected_for_project( project_id, value, with_commit=True, is_model_callback=True ) - diff --git a/controller/project/manager.py b/controller/project/manager.py index d6ade258..71c37bce 100644 --- a/controller/project/manager.py +++ b/controller/project/manager.py @@ -23,7 +23,7 @@ ) from fast_api.types import HuddleData, ProjectSize, GatesIntegrationData from util import daemon, notification -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope from submodules.model.business_objects import util as db_util from submodules.s3 import controller as s3 @@ -177,11 +177,12 @@ def import_sample_project( project_item = handler.import_sample_project( user_id, organization_id, name, project_type ) - task_queue_manager.add_task( - str(project_item.id), + task_master_manager.queue_task( + str(organization_id), + str(user_id), TaskType.TOKENIZATION, - user_id, { + "project_id": str(project_item.id), "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, @@ -314,7 +315,11 @@ def __get_first_data_id(project_id: str, user_id: str, huddle_type: str) -> str: def add_workflow_store_data_to_project( - project_id: str, user_id: str, file_name: str, data: List[Dict[str, Any]] + project_id: str, + org_id: str, + user_id: str, + file_name: str, + data: List[Dict[str, Any]], ): upload_task = upload_task_manager.create_upload_task( user_id=user_id, @@ -331,7 +336,7 @@ def add_workflow_store_data_to_project( upload_task, enums.RecordCategory.SCALE.value, ) - check_and_add_running_id(project_id, user_id) + check_and_add_running_id(project_id, org_id, user_id) upload_task_manager.update_upload_task_to_finished(upload_task) diff --git a/controller/task_queue/handler/__init__.py b/controller/task_master/__init__.py similarity index 100% rename from controller/task_queue/handler/__init__.py rename to controller/task_master/__init__.py diff --git a/controller/task_master/manager.py b/controller/task_master/manager.py new file mode 100644 index 00000000..de003783 --- /dev/null +++ b/controller/task_master/manager.py @@ -0,0 +1,31 @@ +import requests +import os +from submodules.model import enums +from typing import Any, Dict + +TASK_MASTER_URL = os.getenv("TASK_MASTER") + + +def queue_task( + org_id: str, + user_id: str, + task_type: enums.TaskType, + task_info: Dict[str, Any], + priority: bool = False, +) -> requests.Response: + + task_payload = { + "orgId": org_id, + "userId": user_id, + "taskType": task_type.value, + "taskInfo": task_info, + "priority": priority, + } + return requests.put(f"{TASK_MASTER_URL}/task/queue", json=task_payload) + + +def delete_task(org_id: str, task_id: str) -> requests.Response: + return requests.delete( + f"{TASK_MASTER_URL}/task/queue", + json={"orgId": str(org_id), "taskId": str(task_id)}, + ) diff --git a/controller/task_queue/handler/attribute_calculation.py b/controller/task_queue/handler/attribute_calculation.py deleted file mode 100644 index f7b6d713..00000000 --- a/controller/task_queue/handler/attribute_calculation.py +++ /dev/null @@ -1,58 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from controller.attribute import manager as attribute_manager -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, - attribute as attribute_db_bo, -) -from submodules.model.enums import AttributeState, DataTypes -from ..util import if_task_queue_send_websocket -from controller.tokenization.tokenization_service import ( - request_reupload_docbins, -) - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - attribute_id = task["task_info"]["attribute_id"] - # check attribute still exists - - attribute_item = attribute_db_bo.get(project_id, attribute_id) - if attribute_item is None: - return False - task_db_obj.is_active = True - general.commit() - if_task_queue_send_websocket( - task["task_info"], f"ATTRIBUTE:{attribute_id}:{attribute_item.name}" - ) - - attribute_manager.calculate_user_attribute_all_records( - project_id, task["created_by"], attribute_id - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - project_id = task["project_id"] - attribute_id = task["task_info"]["attribute_id"] - attribute_item = attribute_db_bo.get(project_id, attribute_id) - if attribute_item is None: - return True - if attribute_item.state == AttributeState.FAILED.value: - return True - if attribute_item.state == AttributeState.USABLE.value: - if attribute_item.data_type == DataTypes.TEXT.value: - return attribute_db_bo.is_attribute_tokenization_finished( - project_id, attribute_id - ) - else: - request_reupload_docbins(project_id) - return True diff --git a/controller/task_queue/handler/embedding.py b/controller/task_queue/handler/embedding.py deleted file mode 100644 index f353f0ce..00000000 --- a/controller/task_queue/handler/embedding.py +++ /dev/null @@ -1,95 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from controller.embedding import manager as embedding_manager -from submodules.model import enums -from submodules.model.business_objects import ( - agreement as agreement_db_bo, - task_queue as task_queue_db_bo, - embedding as embedding_db_bo, - general, -) -from submodules.model.enums import EmbeddingState -from ..util import if_task_queue_send_websocket - -TASK_DONE_STATES = [EmbeddingState.FINISHED.value, EmbeddingState.FAILED.value] - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 5 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - - # check embedding already exists - embedding_item = embedding_db_bo.get_embedding_id_and_type( - project_id, task["task_info"]["embedding_name"] - ) - if embedding_item is not None: - task_queue_db_bo.remove_task_from_queue(project_id, task["id"], True) - return False - task_db_obj.is_active = True - general.commit() - - user_id = task["created_by"] - attribute_id = task["task_info"]["attribute_id"] - embedding_type = task["task_info"]["embedding_type"] - embedding_name = task["task_info"]["embedding_name"] - platform = task["task_info"]["platform"] - model = task["task_info"]["model"] - api_token = task["task_info"]["api_token"] - - terms_text = task["task_info"]["terms_text"] - terms_accepted = task["task_info"]["terms_accepted"] - - filter_attributes = task["task_info"]["filter_attributes"] - additional_data = task["task_info"]["additional_data"] - embedding_item = embedding_db_bo.create( - project_id, - attribute_id, - embedding_name, - user_id, - enums.EmbeddingState.INITIALIZING.value, - type=embedding_type, - model=model, - platform=platform, - api_token=api_token, - filter_attributes=filter_attributes, - additional_data=additional_data, - ) - if ( - platform == enums.EmbeddingPlatform.OPENAI.value - or platform == enums.EmbeddingPlatform.COHERE.value - or platform == enums.EmbeddingPlatform.AZURE.value - ): - agreement_db_bo.create( - project_id, - user_id, - terms_text, - terms_accepted, - xfkey=embedding_item.id, - xftype=enums.AgreementType.EMBEDDING.value, - ) - - general.commit() - embedding_id = str(embedding_item.id) - if_task_queue_send_websocket( - task["task_info"], f"EMBEDDING:{embedding_id}:{embedding_name}" - ) - - embedding_manager.create_embedding(project_id, embedding_id) - - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - embedding_item = embedding_db_bo.get_embedding_by_name( - task["project_id"], task["task_info"]["embedding_name"] - ) - # if it doesn't exists anymore, it means it was deleted -> we are done with the task - if embedding_item is None: - return True - return embedding_item.state in TASK_DONE_STATES diff --git a/controller/task_queue/handler/information_source.py b/controller/task_queue/handler/information_source.py deleted file mode 100644 index 826f2c7d..00000000 --- a/controller/task_queue/handler/information_source.py +++ /dev/null @@ -1,62 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from controller.payload import manager as payload_manager -from controller.zero_shot import manager as zero_shot_manager -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, - information_source as information_source_db_bo, -) -from submodules.model.enums import PayloadState, InformationSourceType -from ..util import if_task_queue_send_websocket - -TASK_DONE_STATES = [PayloadState.FINISHED.value, PayloadState.FAILED.value] - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - information_source_id = task["task_info"]["information_source_id"] - # check information source still exists - - is_item = information_source_db_bo.get(project_id, information_source_id) - if is_item is None: - return False - task_db_obj.is_active = True - general.commit() - user_id = task["created_by"] - payload_id = None - if is_item.type == InformationSourceType.ZERO_SHOT.value: - payload_id = zero_shot_manager.start_zero_shot_for_project_thread( - project_id, information_source_id, user_id - ) - else: - payload = payload_manager.create_payload( - project_id, information_source_id, user_id - ) - payload_id = str(payload.id) - task["task_info"]["payload_id"] = payload_id - if_task_queue_send_websocket( - task["task_info"], - f"INFORMATION_SOURCE:{information_source_id}:{payload_id}:{is_item.name}", - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - project_id = task["project_id"] - information_source_id = task["task_info"]["information_source_id"] - is_item = information_source_db_bo.get(project_id, information_source_id) - if is_item is None: - return True - payload_id = task["task_info"]["payload_id"] - payload_item = information_source_db_bo.get_payload(project_id, payload_id) - if payload_item is None: - return True - return payload_item.state in TASK_DONE_STATES diff --git a/controller/task_queue/handler/macro.py b/controller/task_queue/handler/macro.py deleted file mode 100644 index be094432..00000000 --- a/controller/task_queue/handler/macro.py +++ /dev/null @@ -1,44 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -import os - -import requests -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, -) - -from submodules.model.cognition_objects import macro as macro_db_bo - -BASE_URI = os.getenv("COGNITION_GATEWAY") - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - - task_db_obj.is_active = True - general.commit() - - action = task["task_info"] - - macro_id = action["macro_id"] - execution_id = action["execution_id"] - group_id = action.get("execution_group_id") - requests.put( - f"{BASE_URI}/api/v1/converters/internal/macros/{macro_id}/execution/{execution_id}/start?group_execution_id={group_id}" - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - - action = task["task_info"] - return macro_db_bo.macro_execution_finished( - action["macro_id"], action["execution_id"], action["execution_group_id"] - ) diff --git a/controller/task_queue/handler/markdown_file.py b/controller/task_queue/handler/markdown_file.py deleted file mode 100644 index bfc43780..00000000 --- a/controller/task_queue/handler/markdown_file.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -import os - -import requests -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, -) -from submodules.model.cognition_objects import ( - markdown_file as markdown_file_db_bo, -) -from submodules.model.enums import CognitionMarkdownFileState - -BASE_URI = os.getenv("COGNITION_GATEWAY") - -TASK_DONE_STATES = [ - CognitionMarkdownFileState.FINISHED.value, - CognitionMarkdownFileState.FAILED.value, -] - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - - action = task["task_info"] - org_id = action["org_id"] - dataset_id = action["dataset_id"] - file_id = action["file_id"] - - task_db_obj.is_active = True - general.commit() - requests.post( - f"{BASE_URI}/api/v1/converters/internal/datasets/{dataset_id}/files/{file_id}/parse", - json={"orgId": org_id}, - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - action = task["task_info"] - org_id = action["org_id"] - file_id = action["file_id"] - markdown_file_entity = markdown_file_db_bo.get(org_id=org_id, md_file_id=file_id) - if markdown_file_entity is None: - return True - return markdown_file_entity.state in TASK_DONE_STATES diff --git a/controller/task_queue/handler/parse_cognition_tmp_file.py b/controller/task_queue/handler/parse_cognition_tmp_file.py deleted file mode 100644 index ca9329bb..00000000 --- a/controller/task_queue/handler/parse_cognition_tmp_file.py +++ /dev/null @@ -1,44 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -import os -import submodules.s3.controller as s3 - -import requests -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, -) - -BASE_URI = os.getenv("COGNITION_GATEWAY") - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - - task_db_obj.is_active = True - general.commit() - - action = task["task_info"] - conversation_id = action["conversation_id"] - cognition_project_id = action["cognition_project_id"] - requests.post( - f"{BASE_URI}/api/v1/converters/internal/projects/{cognition_project_id}/conversation/{conversation_id}/parse-tmp-file", - json={ - "minio_path": action["minio_path"], - "bucket": action["bucket"], - }, - ) - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - - action = task["task_info"] - - return not s3.object_exists(action["bucket"], action["minio_path"]) diff --git a/controller/task_queue/handler/task_queue.py b/controller/task_queue/handler/task_queue.py deleted file mode 100644 index 7fa54986..00000000 --- a/controller/task_queue/handler/task_queue.py +++ /dev/null @@ -1,106 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - project as project_db_bo, -) - -from util import notification - -from submodules.model.enums import TaskType, InformationSourceType - -from .. import manager - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 1 - - -# task queues work a bit different from others since they are just wrapper around tasks in order -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - if len(task["task_info"]) == 0: - return False - if not isinstance(task["task_info"], list): - raise ValueError("Something is wrong") - task["initial_count"] = len(task["task_info"]) - task["done_count"] = 0 - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - project_id = task["project_id"] - if not project_db_bo.get(project_id): - # parent project was deleted so the queue item is done - return True - sub_task_id = task.get("sub_task_id") - if not sub_task_id: - # no sub task yet - __start_sub_task(task) - return False - if sub_task_id != TaskType.TASK_QUEUE_ACTION.value: - # special "id" for task queue actions since they don't have an actual id/db entry - sub_task_item = task_queue_db_bo.get(sub_task_id) - if sub_task_item: - # current sub task still running - return False - - if len(task["task_info"]) > 0: - # still further sub task items - task["done_count"] += 1 - __send_websocket_progress(task) - __start_sub_task(task) - return False - - __send_websocket_progress(task) - return True - - -def __start_sub_task(task: Dict[str, Any]) -> str: - user_id = task["created_by"] - - next_entry = task["task_info"].pop(0) - task_type = next_entry.get("task_type") - # if a specific project id is given for the item we use that otherwise the one from the task - # can e.g. happen with cognition init tasks - project_id = next_entry.get("project_id", task["project_id"]) - del next_entry["task_type"] - - try: - task_type_parsed = TaskType[task_type.upper()] - except KeyError: - raise ValueError(f"Invalid Task Type: {task_type}") - - priority = False - if task_type_parsed == TaskType.ATTRIBUTE_CALCULATION: - priority = True - elif task_type_parsed == TaskType.INFORMATION_SOURCE: - priority = ( - next_entry.get("source_type") != InformationSourceType.ZERO_SHOT.value - ) - elif task_type_parsed == TaskType.TASK_QUEUE_ACTION: - next_entry = next_entry.get("action") - next_entry["parent_task_queue_id"] = task["id"] - task_id, _ = manager.add_task( - project_id, task_type_parsed, user_id, next_entry, priority - ) - - task["sub_task_id"] = task_id - task_queue_db_bo.update_task_info(task["id"], task["task_info"], True) - - -def __send_websocket_progress(task: Dict[str, Any]) -> None: - project_id = task["project_id"] - task_id = task["id"] - if len(task["task_info"]) == 0: - notification.send_organization_update( - project_id, f"task_queue:{task_id}:state:DONE" - ) - else: - progress = round(task["done_count"] / task["initial_count"], 4) - - notification.send_organization_update( - project_id, f"task_queue:{task_id}:progress:{progress}" - ) diff --git a/controller/task_queue/handler/tokenization.py b/controller/task_queue/handler/tokenization.py deleted file mode 100644 index 7b6c8c7a..00000000 --- a/controller/task_queue/handler/tokenization.py +++ /dev/null @@ -1,56 +0,0 @@ -from typing import Any, Dict, Tuple, Callable -from submodules.model.business_objects import ( - task_queue as task_queue_db_bo, - general, - attribute as attribute_db_bo, -) -from controller.tokenization import tokenization_service -from submodules.model.business_objects.tokenization import ( - is_doc_bin_creation_running_or_queued, -) -from submodules.model.enums import RecordTokenizationScope -from ..util import if_task_queue_send_websocket - - -def get_task_functions() -> Tuple[Callable, Callable, int]: - return __start_task, __check_finished, 2 - - -def __start_task(task: Dict[str, Any]) -> bool: - # check task still relevant - task_db_obj = task_queue_db_bo.get(task["id"]) - if task_db_obj is None or task_db_obj.is_active: - return False - project_id = task["project_id"] - - if task["task_info"]["scope"] == RecordTokenizationScope.ATTRIBUTE.value: - attribute_item = attribute_db_bo.get( - project_id, task["task_info"]["attribute_id"] - ) - if attribute_item is None: - task_queue_db_bo.remove_task_from_queue(project_id, task["id"], True) - return False - task_db_obj.is_active = True - general.commit() - if_task_queue_send_websocket(task["task_info"], f"TOKENIZATION") - - if task["task_info"]["scope"] == RecordTokenizationScope.PROJECT.value: - tokenization_service.request_tokenize_project( - project_id, - task["created_by"], - task["task_info"]["include_rats"], - task["task_info"]["only_uploaded_attributes"], - ) - else: - tokenization_service.request_tokenize_calculated_attribute( - project_id, - task["created_by"], - task["task_info"]["attribute_id"], - task["task_info"]["include_rats"], - ) - - return True - - -def __check_finished(task: Dict[str, Any]) -> bool: - return not is_doc_bin_creation_running_or_queued(task["project_id"], True) diff --git a/controller/task_queue/manager.py b/controller/task_queue/manager.py index 1e66987e..2611e749 100644 --- a/controller/task_queue/manager.py +++ b/controller/task_queue/manager.py @@ -1,60 +1,10 @@ -from typing import Any, List, Dict, Tuple, Callable, Union +from typing import Any, List, Dict import copy - from submodules.model import enums - - from submodules.model.business_objects import ( task_queue as task_queue_db_bo, - embedding as embedding_db_bo, - information_source as information_source_db_bo, ) from submodules.model.models import TaskQueue as TaskQueueDBObj -from .handler import ( - embedding as embedding_handler, - information_source as information_source_handler, - tokenization as tokenization_handler, - attribute_calculation as attribute_calculation_handler, - task_queue as task_queue_handler, - markdown_file as markdown_file_handler, - parse_cognition_tmp_file as cognition_tmp_file_handler, - macro as macro_handler, -) -from .util import if_task_queue_send_websocket - -from controller.task_queue import task_queue -from controller.data_slice import manager as data_slice_manager -from controller.gates import manager as gates_manager -from controller.weak_supervision import manager as weak_supervision_manager -from controller.transfer.cognition.import_wizard import finish_cognition_setup -from util import notification, daemon - - -def add_task( - project_id: str, - task_type: enums.TaskType, - user_id: str, - task_info: Union[Dict[str, str], List[Dict[str, str]]], - priority: bool = False, -) -> Tuple[str, int]: - if task_type == enums.TaskType.TASK_QUEUE and not isinstance(task_info, list): - raise ValueError("Task queues only work with list of singular task items") - elif task_type == enums.TaskType.TASK_QUEUE and not len(task_info): - raise ValueError("Task queues need at least one item") - elif task_type != enums.TaskType.TASK_QUEUE and not isinstance(task_info, dict): - raise ValueError("Queue entries only accept dicts") - - if task_type == enums.TaskType.TASK_QUEUE_ACTION: - # just execute the action - __execute_action(project_id, user_id, task_info) - return enums.TaskType.TASK_QUEUE_ACTION.value, 0 - - task_item = task_queue_db_bo.add( - project_id, task_type, user_id, task_info, priority, with_commit=True - ) - task_id = str(task_item.id) - queue_position = add_task_to_task_queue(task_item) - return task_id, queue_position def get_all_waiting_by_type(project_id: str, task_type: str) -> List[TaskQueueDBObj]: @@ -79,86 +29,6 @@ def parse_task_to_dict(task: TaskQueueDBObj) -> Dict[str, Any]: } -def get_task_function_by_type(task_type: str) -> Tuple[Callable, Callable, int]: - if task_type == enums.TaskType.EMBEDDING.value: - return embedding_handler.get_task_functions() - if task_type == enums.TaskType.INFORMATION_SOURCE.value: - return information_source_handler.get_task_functions() - if task_type == enums.TaskType.TOKENIZATION.value: - return tokenization_handler.get_task_functions() - if task_type == enums.TaskType.ATTRIBUTE_CALCULATION.value: - return attribute_calculation_handler.get_task_functions() - if task_type == enums.TaskType.TASK_QUEUE.value: - return task_queue_handler.get_task_functions() - if task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value: - return markdown_file_handler.get_task_functions() - if task_type == enums.TaskType.PARSE_COGNITION_TMP_FILE.value: - return cognition_tmp_file_handler.get_task_functions() - if task_type == enums.TaskType.RUN_COGNITION_MACRO.value: - return macro_handler.get_task_functions() - raise ValueError(f"Task type {task_type} not supported yet") - - -def add_task_to_task_queue(task: TaskQueueDBObj) -> int: - start_func, check_func, check_every = get_task_function_by_type(task.task_type) - queue = None - if ( - task.task_type == enums.TaskType.TASK_QUEUE.value - or task.task_type == enums.TaskType.RUN_COGNITION_MACRO.value - ): - # macros have tasks (e.g. etl parsing) inside so the execution shouldn't block own queue items - queue = task_queue.get_task_queue_queue() - else: - queue = task_queue.get_task_queue() - return queue.add_task(task, start_func, check_func, check_every) - - def remove_task_from_queue(project_id: str, task_id: str) -> None: task_queue_db_bo.remove_task_from_queue(project_id, task_id, with_commit=True) # no need to dequeue from tasks since the initial check should handle it - - -def __execute_action(project_id: str, user_id: str, action: Dict[str, Any]) -> None: - action_type = action.get("action_type") - if action_type == enums.TaskQueueAction.CREATE_OUTLIER_SLICE.value: - embedding_name = action.get("embedding_name") - embedding_item = embedding_db_bo.get_embedding_by_name( - project_id, embedding_name - ) - if not embedding_item: - raise ValueError(f"Unknown embedding {embedding_name}") - - data_slice_manager.create_outlier_slice( - project_id, user_id, str(embedding_item.id) - ) - elif action_type == enums.TaskQueueAction.START_GATES.value: - # id overwrite since a queue can hold different project ids (e.g. wizard) - project_id = action.get("project_id") - if not project_id: - raise ValueError("Missing project id") - if_task_queue_send_websocket( - action, f"{enums.TaskQueueAction.START_GATES.value}:{project_id}" - ) - state = enums.PayloadState.FINISHED.value - if not gates_manager.start_gates_container(project_id): - state = enums.PayloadState.FAILED.value - notification.send_organization_update(project_id, f"gates:startup:{state}") - - elif action_type == enums.TaskQueueAction.SEND_WEBSOCKET.value: - org_id = action.get("organization_id") - notification.send_organization_update( - project_id, action.get("message", ""), organization_id=org_id - ) - elif action_type == enums.TaskQueueAction.FINISH_COGNITION_SETUP.value: - daemon.run(finish_cognition_setup, action.get("cognition_project_id")) - elif action_type == enums.TaskQueueAction.RUN_WEAK_SUPERVISION.value: - information_source_db_bo.update_is_selected_for_project(project_id, False) - information_source_db_bo.update_is_selected_for_project( - project_id, - True, - with_commit=True, - only_with_state=enums.PayloadState.FINISHED, - ) - weak_supervision_manager.run_weak_supervision(project_id, user_id) - else: - raise ValueError(f"Invalid action type: {action_type}") diff --git a/controller/task_queue/task_queue.py b/controller/task_queue/task_queue.py deleted file mode 100644 index 81d6e5b9..00000000 --- a/controller/task_queue/task_queue.py +++ /dev/null @@ -1,242 +0,0 @@ -from threading import Lock, Thread -from typing import Tuple, Dict, Any, Callable -import os -import time -from submodules.model.models import TaskQueue as TaskQueueDBObj -from . import manager -from submodules.model.business_objects import general, task_queue as task_queue_db_bo -from submodules.model.cognition_objects import macro as macro_db_bo -import traceback -from submodules.model.enums import TaskType, MacroExecutionState - - -# custom class wrapping a list in order to make it thread safe -class ThreadSafeList: - def __init__(self): - self._list = list() - self._lock = Lock() - - def append(self, value): - with self._lock: - self._list.append(value) - - def extend(self, value): - with self._lock: - self._list.extend(value) - - def pop(self, index: int = -1): - with self._lock: - return self._list.pop(index) - - def get(self, index: int): - with self._lock: - return self._list[index] - - def length(self): - with self._lock: - return len(self._list) - - def print(self): - with self._lock: - return print(self._list, flush=True) - - -class CustomTaskQueue: - class TaskInfo: - def __init__( - self, - task: TaskQueueDBObj, - start_function: Callable[[Dict[str, Any]], bool], - check_finished_function: Callable[[Dict[str, Any]], bool], - check_every: int, - ) -> None: - self.task_dict = manager.parse_task_to_dict(task) - self.start_function = start_function - self.check_finished_function = check_finished_function - self.check_every = check_every - - def __init__(self, max_normal: int, max_priority: int) -> None: - self._lock = Lock() - self._max_normal = max_normal - self._fifo_queue_normal = ThreadSafeList() - self._active_normal = ThreadSafeList() - self._max_priority = max_priority - self._fifo_queue_priority = ThreadSafeList() - self._active_priority = ThreadSafeList() - self._checker_thread = Thread( - target=self.__thread_checker, - daemon=True, - ) - self._checker_thread.start() - - def add_task( - self, - task: TaskQueueDBObj, - start_function: Callable[[Dict[str, Any]], bool], - check_finished_function: Callable[[Dict[str, Any]], bool], - check_every: int = 1, - ) -> int: - # return queue position or 0 if directly started - with self._lock: - active = self._active_priority if task.priority else self._active_normal - max = self._max_priority if task.priority else self._max_normal - - task_info = self.TaskInfo( - task, start_function, check_finished_function, check_every - ) - add_to_prio = task.priority - # ensure prio tasks can be added to none prio queue if fits - if active.length() >= max: - if task.priority and self._active_normal.length() < self._max_normal: - add_to_prio = False - else: - append_to = ( - self._fifo_queue_priority - if task.priority - else self._fifo_queue_normal - ) - append_to.append(task_info) - return append_to.length() - self.__start_task(task_info, add_to_prio) - return 0 - - def __start_task(self, task_info: TaskInfo, to_prio: bool) -> bool: - if not task_info: - return False - - active = self._active_priority if to_prio else self._active_normal - # since multiple gateway container can exist the start can "fail" if the task is already active - try: - if not task_info.start_function(task_info.task_dict): - return False - active.append(task_info) - task_info.task_dict["is_active"] = True - except Exception: - print( - f"Task start of task failed (task info->{task_info.task_dict}) -> deleting from db...", - flush=True, - ) - task_queue_db_bo.remove_task_from_queue( - task_info.task_dict["project_id"], - task_info.task_dict["id"], - with_commit=True, - ) - - print("done...\nError:", flush=True) - print(traceback.format_exc(), flush=True) - return False - return True - - def __thread_checker(self) -> None: - ctx_token = general.get_ctx_token() - - seconds = 0 - while True: - seconds += 1 - if seconds >= 120: - seconds = 0 - ctx_token = general.remove_and_refresh_session(ctx_token, True) - try: - self.__check_task_queue(True, seconds) - self.__check_task_queue(False, seconds) - except Exception: - print(traceback.format_exc(), flush=True) - time.sleep(1) - - def __check_task_queue(self, priority: bool, seconds: int) -> None: - to_check = self._active_priority if priority else self._active_normal - - to_remove = [] - for idx in range(to_check.length()): - task = to_check.get(idx) - if seconds % task.check_every == 0: - if task.check_finished_function(task.task_dict): - task.task_dict["is_active"] = False - to_remove.append(idx) - - # cleanup & requeue existing tasks - for idx in reversed(to_remove): - finished = to_check.pop(idx) - task_queue_db_bo.remove_task_from_queue( - finished.task_dict["project_id"], finished.task_dict["id"] - ) - next_task, p = None, False - while not self.__start_task(next_task, p): - next_task, p = self.__get_next_item(priority) - if not next_task: - break - if len(to_remove) > 0: - general.commit() - - def __get_next_item(self, priority: bool) -> Tuple[TaskInfo, bool]: - # normal queue can solve priority tasks but not the other way around - if priority: - if self._fifo_queue_priority.length() > 0: - return self._fifo_queue_priority.pop(0), True - else: - if self._fifo_queue_normal.length() > 0: - return self._fifo_queue_normal.pop(0), False - if self._fifo_queue_priority.length() > 0: - return self._fifo_queue_priority.pop(0), False - return None, False - - -# global task queue -task_queue = None - -# thread wrapper for tasks item of type TASK_QUEUE so the queue doesn't block an actual calculation slot -task_queue_queue = None - - -def init_task_queues() -> CustomTaskQueue: - global task_queue, task_queue_queue - # init task queue class - max_normal = int(os.getenv("TASK_QUEUE_SLOTS", "2")) - max_priority = int(os.getenv("PRIORITY_TASK_QUEUE_SLOTS", "1")) - task_queue = CustomTaskQueue(max_normal, max_priority) - # double amount as normal so multiple queues can be handled simultaneously if the server can handle it - # tested locally with two wizards which resulted in reasonable performance (ping pong between the queues) - task_queue_queue = CustomTaskQueue(max_normal * 2, 0) - # reset old tasks that weren't finished properly - task_queue_db_bo.set_all_tasks_inactive(True) - - # queue tasks - existing_tasks = task_queue_db_bo.get_all_tasks() - for task in existing_tasks: - start_func, check_func, check_every = manager.get_task_function_by_type( - task.task_type - ) - if task.task_type == TaskType.TASK_QUEUE.value: - task_queue_queue.add_task(task, start_func, check_func, check_every) - elif task.task_type == TaskType.RUN_COGNITION_MACRO.value: - # macros that were running when the server was restarted are set to failed since we dont have pointers to the running process - item = macro_db_bo.get_macro_execution( - task.task_info["execution_id"], - task.task_info["execution_group_id"], - MacroExecutionState.RUNNING, - ) - if item is not None: - item.state = MacroExecutionState.FAILED.value - task_queue_db_bo.remove_task_from_queue(task.project_id, task.id) - continue - task_queue_queue.add_task(task, start_func, check_func, check_every) - else: - task_queue.add_task(task, start_func, check_func, check_every) - general.commit() - return task_queue - - -def get_task_queue() -> CustomTaskQueue: - global task_queue - if task_queue is None: - raise Exception("Task queue not initialized") - - return task_queue - - -def get_task_queue_queue() -> CustomTaskQueue: - global task_queue_queue - if task_queue_queue is None: - raise Exception("Task queue not initialized") - - return task_queue_queue diff --git a/controller/task_queue/util.py b/controller/task_queue/util.py deleted file mode 100644 index 8da74ec6..00000000 --- a/controller/task_queue/util.py +++ /dev/null @@ -1,16 +0,0 @@ -from util import notification - -from typing import Any, Dict - - -def if_task_queue_send_websocket( - task: Dict[str, Any], msg: str, info_type: str = "START" -) -> None: - task_queue_id = task.get("parent_task_queue_id") - if not task_queue_id: - return - - project_id = task["project_id"] - notification.send_organization_update( - project_id, f"task_queue:{task_queue_id}:{info_type}:{msg}" - ) diff --git a/controller/tokenization/manager.py b/controller/tokenization/manager.py index f16a1b9f..61cb4072 100644 --- a/controller/tokenization/manager.py +++ b/controller/tokenization/manager.py @@ -15,7 +15,7 @@ request_tokenize_record, ) import logging -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope logging.basicConfig(level=logging.INFO) @@ -91,15 +91,16 @@ def start_record_tokenization(project_id: str, record_id: str) -> None: ) -def start_project_tokenization(project_id: str, user_id: str) -> None: - task_queue_manager.add_task( - project_id, +def start_project_tokenization(project_id: str, org_id: str, user_id: str) -> None: + task_master_manager.queue_task( + str(org_id), + str(user_id), TaskType.TOKENIZATION, - user_id, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project_id), }, ) diff --git a/controller/transfer/cognition/import_wizard.py b/controller/transfer/cognition/import_wizard.py index 67988510..7a12067f 100644 --- a/controller/transfer/cognition/import_wizard.py +++ b/controller/transfer/cognition/import_wizard.py @@ -23,7 +23,7 @@ from controller.labeling_task_label import manager as label_manager from controller.information_source import manager as information_source_manager from controller.attribute import manager as attribute_manager -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from controller.embedding import manager as embedding_manager from .bricks_loader import get_bricks_code_from_group, get_bricks_code_from_endpoint @@ -34,6 +34,7 @@ FREE_API_REQUEST_URL, ) from .util import send_log_message +import traceback class TokenRef: @@ -53,6 +54,7 @@ def prepare_and_finalize_setup(cognition_project_id: str, task_id: str) -> None: __finalize_setup(token_ref, cognition_project_id, task_id) except Exception as e: print(f"Error during wizard setup: {str(e)}", flush=True) + print(traceback.format_exc()) finally: token_ref.cleanup() @@ -112,6 +114,7 @@ def __finalize_setup( __finalize_setup_for( CognitionProjects.REFERENCE, reference_project_id, + organization_id, user_id, project_language, file_additional_info, @@ -127,6 +130,7 @@ def __finalize_setup( __finalize_setup_for( CognitionProjects.QUESTION, question_project_id, + organization_id, user_id, project_language, file_additional_info, @@ -156,6 +160,7 @@ def __finalize_setup( __finalize_setup_for( CognitionProjects.RELEVANCE, relevance_project_id, + organization_id, user_id, project_language, file_additional_info, @@ -168,7 +173,7 @@ def __finalize_setup( organization_id=organization_id, ) - __add_start_gates_for(reference_project_id, task_list) + __add_start_gates_for(reference_project_id, organization_id, task_list) # currently disabled since not part of initial offering # __add_start_gates_for(question_project_id, task_list) @@ -176,6 +181,7 @@ def __finalize_setup( task_list.append( { + "organization_id": organization_id, "project_id": reference_project_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { @@ -207,21 +213,28 @@ def __finalize_setup( continue break - task_id, position = task_queue_manager.add_task( - reference_project_id, enums.TaskType.TASK_QUEUE, user_id, task_list + queue_response = task_master_manager.queue_task( + organization_id, + user_id, + enums.TaskType.TASK_QUEUE, + {"project_id": cognition_project_id, "task_list": task_list}, ) + if queue_response.ok: + queue_info = queue_response.json() + task_id = queue_info["task_id"] + position = queue_info.get("position") - notification.send_organization_update( - cognition_project_id, - f"cognition_wizard:task_queue:{task_id}:{len(task_list)}", - organization_id=organization_id, - ) - if position: notification.send_organization_update( cognition_project_id, - f"task_queue:{str(task_id)}:QUEUE_POSITION:{position}", + f"cognition_wizard:task_queue:{task_id}:{len(task_list)}", organization_id=organization_id, ) + if position: + notification.send_organization_update( + cognition_project_id, + f"task_queue:{str(task_id)}:QUEUE_POSITION:{position}", + organization_id=organization_id, + ) # function called from queue as last entry @@ -270,7 +283,7 @@ def __add_websocket_message_queue_item( action["organization_id"] = organization_id task_list.append( { - "project_id": sender_project_id, + "organization_id": organization_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": action, } @@ -279,14 +292,16 @@ def __add_websocket_message_queue_item( def __add_weakly_supervise_all_valid( project_id: str, + org_id: str, task_list: List[Dict[str, str]], ) -> None: task_list.append( { - "project_id": project_id, + "organization_id": org_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { "action_type": enums.TaskQueueAction.RUN_WEAK_SUPERVISION.value, + "project_id": project_id, }, } ) @@ -294,11 +309,12 @@ def __add_weakly_supervise_all_valid( def __add_start_gates_for( project_id: str, + org_id: str, task_list: List[Dict[str, str]], ) -> None: task_list.append( { - "project_id": project_id, + "organization_id": org_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { "action_type": enums.TaskQueueAction.START_GATES.value, @@ -312,6 +328,7 @@ def __add_start_gates_for( def __finalize_setup_for( project_type: CognitionProjects, project_id: str, + org_id: str, user_id: str, project_language: str, file_additional_info: Dict[str, Any], @@ -333,6 +350,7 @@ def __finalize_setup_for( if bricks: __load_ac_from_bricks_group( project_id, + org_id, bricks["group"], bricks.get("type", "generator"), bricks.get("type_lookup", {}), @@ -355,6 +373,7 @@ def __finalize_setup_for( ) __create_attribute_with( project_id, + org_id, code, attribute["name"], attribute["type"], @@ -400,6 +419,7 @@ def __finalize_setup_for( project_id, labeling_task_id, user_id, + org_id, bricks["group"], bricks.get("type", "classifier"), target_data, @@ -425,6 +445,7 @@ def __finalize_setup_for( filter_columns = [] embedding_name = __add_embedding( project_id, + org_id, embedding.get("target", {}), project_language, filter_columns, @@ -451,7 +472,7 @@ def __finalize_setup_for( target_data, name_prefix=bricks.get("function_prefix"), ) - __add_weakly_supervise_all_valid(project_id, task_list) + __add_weakly_supervise_all_valid(project_id, org_id, task_list) token_ref.request_new() @@ -482,6 +503,7 @@ def __load_lf_from_bricks_group( target_project_id: str, target_task_id: str, user_id: str, + org_id: str, group_key: str, bricks_type: str, target_data: Dict[str, str], @@ -511,6 +533,7 @@ def __load_lf_from_bricks_group( if append_to_task_list: task_list.append( { + "organization_id": org_id, "project_id": target_project_id, "task_type": enums.TaskType.INFORMATION_SOURCE.value, "information_source_id": str(item.id), @@ -553,6 +576,7 @@ def __load_active_learner_from_bricks_group( def __load_ac_from_bricks_group( target_project_id: str, + org_id: str, group_key: str, bricks_type: str, data_type_lookup: Dict[str, str], @@ -577,6 +601,7 @@ def __load_ac_from_bricks_group( ) __create_attribute_with( target_project_id, + org_id, code, name, data_type, @@ -587,6 +612,7 @@ def __load_ac_from_bricks_group( def __add_embedding( target_project_id: str, + org_id: str, target_info: Dict[str, str], project_language: str, filter_columns: List[str], @@ -618,6 +644,7 @@ def __add_embedding( ) task_list.append( { + "organization_id": org_id, "project_id": target_project_id, "task_type": enums.TaskType.EMBEDDING.value, "embedding_type": target_embedding_type, @@ -635,11 +662,13 @@ def __add_embedding( if create_outlier_slice: task_list.append( { + "organization_id": org_id, "project_id": target_project_id, "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, "action": { "action_type": enums.TaskQueueAction.CREATE_OUTLIER_SLICE.value, "embedding_name": embedding_name, + "project_id": target_project_id, }, } ) @@ -648,6 +677,7 @@ def __add_embedding( def __create_attribute_with( project_id: str, + org_id: str, code: str, name: str, attribute_type: str, @@ -663,6 +693,7 @@ def __create_attribute_with( if append_to_task_list: task_list.append( { + "organization_id": org_id, "project_id": project_id, "task_type": enums.TaskType.ATTRIBUTE_CALCULATION.value, "attribute_id": attribute_id, diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 4229f5ae..60b9827b 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -1,9 +1,8 @@ from typing import List from submodules.model.cognition_objects import project as cognition_project -from submodules.model.business_objects import project from submodules.model.cognition_objects import conversation from submodules.model.enums import TaskType -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager def handle_cognition_file_upload(path_parts: List[str]): @@ -18,26 +17,17 @@ def handle_cognition_file_upload(path_parts: List[str]): if not cognition_prj: return - project_id = None - if cognition_prj.refinery_references_project_id: - project_id = str(cognition_prj.refinery_references_project_id) - else: - project_id = str( - project.get_or_create_queue_project( - cognition_prj.organization_id, cognition_prj.created_by, True - ).id - ) conversation_item = conversation.get(cognition_project_id, conversation_id) if not conversation_item: return - task_queue_manager.add_task( - project_id, + task_master_manager.queue_task( + str(cognition_prj.organization_id), + str(conversation_item.created_by), TaskType.PARSE_COGNITION_TMP_FILE, - conversation_item.created_by, { - "cognition_project_id": cognition_project_id, - "conversation_id": conversation_id, + "cognition_project_id": str(cognition_project_id), + "conversation_id": str(conversation_id), "minio_path": "/".join(path_parts[1:]), "bucket": path_parts[0], }, diff --git a/controller/transfer/manager.py b/controller/transfer/manager.py index ce7416b5..c5245364 100644 --- a/controller/transfer/manager.py +++ b/controller/transfer/manager.py @@ -40,7 +40,7 @@ from controller.labeling_task import manager as labeling_task_manager from controller.labeling_task_label import manager as labeling_task_label_manager from submodules.model.business_objects import record_label_association as rla -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope @@ -140,7 +140,7 @@ def import_records_from_json( os.remove(file_path) -def check_and_add_running_id(project_id: str, user_id: str): +def check_and_add_running_id(project_id: str, org_id: str, user_id: str): attributes = attribute.get_all(project_id) add_running_id = True for att in attributes: @@ -148,7 +148,9 @@ def check_and_add_running_id(project_id: str, user_id: str): add_running_id = False break if add_running_id: - attribute_manager.add_running_id(user_id, project_id, "running_id", False) + attribute_manager.add_running_id( + user_id, org_id, project_id, "running_id", False + ) def import_project(project_id: str, task: UploadTask) -> None: @@ -304,7 +306,7 @@ def generate_labelstudio_template( ) -def import_label_studio_file(project_id: str, upload_task_id: str) -> None: +def import_label_studio_file(project_id: str, org_id: str, upload_task_id: str) -> None: ctx_token = general.get_ctx_token() try: if attribute.get_all(project_id): @@ -312,14 +314,15 @@ def import_label_studio_file(project_id: str, upload_task_id: str) -> None: else: project_creation_manager.manage_data_import(project_id, upload_task_id) task = upload_task.get(project_id, upload_task_id) - task_queue_manager.add_task( - project_id, - TaskType.TOKENIZATION, + task_master_manager.queue_task( + str(org_id), str(task.user_id), + TaskType.TOKENIZATION, { "scope": RecordTokenizationScope.PROJECT.value, "include_rats": True, "only_uploaded_attributes": False, + "project_id": str(project_id), }, ) upload_task.update( diff --git a/controller/upload_task/manager.py b/controller/upload_task/manager.py index a791f09f..c2994291 100644 --- a/controller/upload_task/manager.py +++ b/controller/upload_task/manager.py @@ -1,11 +1,10 @@ import logging -from typing import ByteString, Optional - +from typing import Optional from controller.transfer.util import ( get_upload_task_message as get_upload_task_message_orig, ) from submodules.model import UploadTask, enums -from submodules.model.business_objects import upload_task, general +from submodules.model.business_objects import upload_task from util import notification logger = logging.getLogger(__name__) @@ -39,7 +38,7 @@ def create_upload_task( file_type: str, file_import_options: str, upload_type: str, - key: str + key: str, ) -> UploadTask: task = upload_task.create( user_id, @@ -61,7 +60,6 @@ def update_upload_task_to_finished(task: UploadTask) -> None: ) - def update_task( project_id: str, task_id: str, diff --git a/fast_api/models.py b/fast_api/models.py index b40dc73d..fba41600 100644 --- a/fast_api/models.py +++ b/fast_api/models.py @@ -438,6 +438,31 @@ class CancelTaskBody(BaseModel): task_type: StrictStr +class AttributeCalculationTaskExecutionBody(BaseModel): + organization_id: StrictStr + project_id: StrictStr + user_id: StrictStr + attribute_id: StrictStr + + +class InformationSourceTaskExecutionBody(BaseModel): + project_id: StrictStr + information_source_id: StrictStr + information_source_type: StrictStr + user_id: StrictStr + + +class DataSliceActionExecutionBody(BaseModel): + project_id: StrictStr + user_id: StrictStr + embedding_id: StrictStr + + +class WeakSupervisionActionExecutionBody(BaseModel): + project_id: StrictStr + user_id: StrictStr + + class CreateCustomerButton(BaseModel): org_id: StrictStr type: CustomerButtonType diff --git a/fast_api/routes/embedding.py b/fast_api/routes/embedding.py index 364cf0ee..9f722f86 100644 --- a/fast_api/routes/embedding.py +++ b/fast_api/routes/embedding.py @@ -5,7 +5,7 @@ from controller.misc import manager as misc from fastapi import APIRouter, Body, Depends, Request from controller.embedding import manager -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from controller.auth import manager as auth_manager from controller.embedding.connector import collection_on_qdrant from submodules.model.business_objects import project @@ -114,10 +114,10 @@ def get_embeddings(project_id: str) -> List: ) def delete_from_task_queue( request: Request, - project_id: str, task_id: str, ): - task_queue_manager.remove_task_from_queue(project_id, task_id) + org_id = auth_manager.get_user_by_info(request.state.info).organization_id + task_master_manager.delete_task(org_id, task_id) return pack_json_result({"data": {"deleteFromTaskQueue": {"ok": True}}}) @@ -166,11 +166,12 @@ def create_embedding( "version": body.config.get("version"), } - task_queue_manager.add_task( - project_id, + task_master_manager.queue_task( + str(user.organization_id), + str(user.id), TaskType.EMBEDDING, - user.id, { + "project_id": project_id, "embedding_type": embedding_type, "attribute_id": body.attribute_id, "embedding_name": manager.get_embedding_name( diff --git a/fast_api/routes/heuristic.py b/fast_api/routes/heuristic.py index 7a505ca0..89131bbf 100644 --- a/fast_api/routes/heuristic.py +++ b/fast_api/routes/heuristic.py @@ -11,7 +11,7 @@ from submodules.model.enums import InformationSourceType from submodules.model.util import pack_edges_node, sql_alchemy_to_dict from util import notification -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from submodules.model import enums from controller.auth import kratos @@ -187,6 +187,7 @@ def set_payload( heuristic_id: str, ): user = auth_manager.get_user_by_info(request.state.info) + org_id = user.organization_id information_source_item = information_source.get(project_id, heuristic_id) if information_source_item.type == enums.InformationSourceType.CROWD_LABELER.value: return pack_json_result({"data": {"createPayload": None}}) @@ -194,16 +195,20 @@ def set_payload( information_source_item.type != enums.InformationSourceType.ZERO_SHOT.value ) - queue_id, _ = task_queue_manager.add_task( - project_id, + task_master_response = task_master_manager.queue_task( + str(org_id), + str(user.id), enums.TaskType.INFORMATION_SOURCE, - user.id, { - "information_source_id": heuristic_id, + "project_id": str(project_id), + "information_source_id": str(heuristic_id), "source_type": information_source_item.type, }, priority=priority, ) + queue_id = None + if task_master_response.ok: + queue_id = task_master_response.json().get("task_id") return pack_json_result({"data": {"createPayload": {"queueId": queue_id}}}) diff --git a/fast_api/routes/project_setting.py b/fast_api/routes/project_setting.py index c0a46b98..79f32cd7 100644 --- a/fast_api/routes/project_setting.py +++ b/fast_api/routes/project_setting.py @@ -11,7 +11,6 @@ from typing import Dict from fastapi.responses import JSONResponse -from controller.task_queue import manager from controller.auth import manager as auth_manager from controller.transfer import manager as transfer_manager from controller.attribute import manager as attribute_manager @@ -19,6 +18,8 @@ from controller.labeling_task import manager as task_manager from controller.project import manager as project_manager from controller.record import manager as record_manager +from controller.task_master import manager as task_master_manager +from controller.task_queue import manager as task_queue_manager from fast_api.routes.client_response import pack_json_result from submodules.model.enums import TaskType from submodules.model.util import sql_alchemy_to_dict @@ -55,7 +56,7 @@ def get_queued_tasks( project_id: str, task_type: str, ) -> Dict: - data = manager.get_all_waiting_by_type(project_id, task_type) + data = task_queue_manager.get_all_waiting_by_type(project_id, task_type) data_dict = sql_alchemy_to_dict(data, column_whitelist=QUEUED_TASKS_WHITELIST) return pack_json_result({"data": {"queuedTasks": data_dict}}) @@ -265,14 +266,16 @@ def calculate_user_attribute_all_records( project_id: str, body: CalculateUserAttributeAllRecordsBody = Body(...), ): - user_id = auth_manager.get_user_by_info(request.state.info).id - - manager.add_task( - project_id, + user = auth_manager.get_user_by_info(request.state.info) + user_id = user.id + org_id = user.organization_id + task_master_manager.queue_task( + str(org_id), + str(user_id), TaskType.ATTRIBUTE_CALCULATION, - user_id, { - "attribute_id": body.attribute_id, + "project_id": str(project_id), + "attribute_id": str(body.attribute_id), }, True, ) diff --git a/fast_api/routes/task_execution.py b/fast_api/routes/task_execution.py new file mode 100644 index 00000000..0d75d927 --- /dev/null +++ b/fast_api/routes/task_execution.py @@ -0,0 +1,95 @@ +from controller.attribute import manager as attribute_manager +from controller.zero_shot import manager as zero_shot_manager +from controller.payload import manager as payload_manager +from controller.data_slice import manager as data_slice_manager +from controller.weak_supervision import manager as weak_supervision_manager +from fast_api.models import ( + AttributeCalculationTaskExecutionBody, + InformationSourceTaskExecutionBody, + DataSliceActionExecutionBody, + WeakSupervisionActionExecutionBody, +) +from fastapi import APIRouter +from util import daemon +from submodules.model.enums import InformationSourceType +from fast_api.routes.client_response import pack_json_result, SILENT_SUCCESS_RESPONSE + +router = APIRouter() + + +@router.post( + "/attribute-calculation", +) +def calculate_attributes( + attribute_calculation_task_execution: AttributeCalculationTaskExecutionBody, +): + daemon.run( + attribute_manager.calculate_user_attribute_all_records, + attribute_calculation_task_execution.project_id, + attribute_calculation_task_execution.organization_id, + attribute_calculation_task_execution.user_id, + attribute_calculation_task_execution.attribute_id, + ) + + return SILENT_SUCCESS_RESPONSE + + +@router.post( + "/information-source", +) +def information_source( + information_source_task_execution: InformationSourceTaskExecutionBody, +): + + # already threaded in managers + if ( + information_source_task_execution.information_source_type + == InformationSourceType.ZERO_SHOT.value + ): + payload_id = zero_shot_manager.start_zero_shot_for_project_thread( + information_source_task_execution.project_id, + information_source_task_execution.information_source_id, + information_source_task_execution.user_id, + ) + else: + payload = payload_manager.create_payload( + information_source_task_execution.project_id, + information_source_task_execution.information_source_id, + information_source_task_execution.user_id, + ) + if payload: + payload_id = payload.id + return pack_json_result({"payload_id": str(payload_id)}, wrap_for_frontend=False) + + +@router.post( + "/data-slice", +) +def data_slice( + data_slice_action_execution: DataSliceActionExecutionBody, +): + + daemon.run( + data_slice_manager.create_outlier_slice, + data_slice_action_execution.project_id, + data_slice_action_execution.user_id, + data_slice_action_execution.embedding_id, + ) + + return SILENT_SUCCESS_RESPONSE + + +@router.post( + "/weak-supervision", +) +def weak_supervision( + weak_supervision_action_execution: WeakSupervisionActionExecutionBody, +): + + daemon.run( + weak_supervision_manager.run_weak_supervision, + weak_supervision_action_execution.project_id, + weak_supervision_action_execution.user_id, + ) + + return SILENT_SUCCESS_RESPONSE diff --git a/fast_api/routes/zero_shot.py b/fast_api/routes/zero_shot.py index 724605f6..e188e69b 100644 --- a/fast_api/routes/zero_shot.py +++ b/fast_api/routes/zero_shot.py @@ -8,7 +8,7 @@ from fast_api.routes.client_response import pack_json_result from controller.auth import manager as auth_manager from fastapi import APIRouter, Body, Depends, Request -from controller.task_queue import manager as task_queue_manager +from controller.task_master import manager as task_master_manager from controller.zero_shot import manager as zero_shot_manager from submodules.model.enums import TaskType from util import notification @@ -98,12 +98,14 @@ def init_zeroshot( project_id: str, heuristic_id: str, ): - user_id = auth_manager.get_user_id_by_info(request.state.info) - task_queue_manager.add_task( - project_id, + user = auth_manager.get_user_by_info(request.state.info) + + task_master_manager.queue_task( + user.organization_id, + user.id, TaskType.INFORMATION_SOURCE, - user_id, { + "project_id,": project_id, "information_source_id": heuristic_id, }, ) diff --git a/route_prefix.py b/route_prefix.py index 3215274a..873f6e86 100644 --- a/route_prefix.py +++ b/route_prefix.py @@ -17,3 +17,4 @@ PREFIX_RECORD = PREFIX + "/record" PREFIX_WEAK_SUPERVISION = PREFIX + "/weak-supervision" PREFIX_LABELING_TASKS = PREFIX + "/labeling-tasks" +PREFIX_TASK_EXECUTION = PREFIX + "/task-execution" diff --git a/start b/start index b92bef6b..1bb726b3 100755 --- a/start +++ b/start @@ -73,8 +73,7 @@ docker run -d --rm \ -e GATES=http://gates-gateway:80 \ -e COGNITION_GATEWAY=http://cognition-gateway:80 \ -e KRATOS_ADMIN_URL=http://kratos:4434 \ --e TASK_QUEUE_SLOTS=1 \ --e PRIORITY_TASK_QUEUE_SLOTS=1 \ +-e TASK_MASTER=http://cognition-task-master:80 \ -e INFERENCE_DIR=$INFERENCE_DIR \ -e SECRET_KEY=default \ -e POSTGRES_POOL_USE_LIFO=x \ diff --git a/submodules/model b/submodules/model index 4afc09b4..6a0b51a5 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 4afc09b47cf27d6e7537cdb8ea614b8e26d2e022 +Subproject commit 6a0b51a5e7cf88d58cb258a6c9014c4644626faa