From 0e7b72b42d02d26fc57faa7d28e889fccbc301fd Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Tue, 10 Dec 2024 14:22:09 +0100 Subject: [PATCH 01/23] Initial commit, removes unused endpoints due to sdk removal --- api/transfer.py | 100 +----------------------------------------------- app.py | 13 +------ 2 files changed, 2 insertions(+), 111 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 15fd5f55..7a17679e 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -11,11 +11,9 @@ import_wizard as cognition_import_wizard, ) from exceptions.exceptions import BadPasswordError -from submodules.s3 import controller as s3 from submodules.model.business_objects import ( attribute, general, - organization, tokenization, project, ) @@ -27,10 +25,8 @@ from controller.transfer import manager as transfer_manager from controller.upload_task import manager as upload_task_manager from controller.auth import manager as auth_manager -from controller.transfer import association_transfer_manager -from controller.project import manager as project_manager from controller.attribute import manager as attribute_manager - +from controller.project import manager as project_manager from submodules.model import enums, exceptions from util.notification import create_notification from submodules.model.enums import NotificationType @@ -98,72 +94,6 @@ async def post(self, request) -> PlainTextResponse: return PlainTextResponse("OK") -class FileExport(HTTPEndpoint): - def get(self, request) -> JSONResponse: - project_id = request.path_params["project_id"] - user_id = request.query_params["user_id"] - num_samples = request.query_params.get("num_samples") - try: - auth_manager.check_project_access_from_user_id( - user_id, project_id, from_api=True - ) - except exceptions.EntityNotFoundException: - return JSONResponse({"error": "Could not find project"}, status_code=404) - except exceptions.AccessDeniedException: - return JSONResponse({"error": "Access denied"}, status_code=403) - result = transfer_manager.export_records(project_id, num_samples) - return JSONResponse(result) - - -class KnowledgeBaseExport(HTTPEndpoint): - def get(self, request) -> JSONResponse: - project_id = request.path_params["project_id"] - list_id = request.path_params["knowledge_base_id"] - user_id = request.query_params["user_id"] - try: - auth_manager.check_project_access_from_user_id( - user_id, project_id, from_api=True - ) - except exceptions.EntityNotFoundException: - return JSONResponse({"error": "Could not find project"}, status_code=404) - except exceptions.AccessDeniedException: - return JSONResponse({"error": "Access denied"}, status_code=403) - result = transfer_manager.export_knowledge_base(project_id, list_id) - return JSONResponse(result) - - -class PrepareFileImport(HTTPEndpoint): - async def post(self, request) -> JSONResponse: - project_id = request.path_params["project_id"] - request_body = await request.json() - - user_id = request_body["user_id"] - try: - auth_manager.check_project_access_from_user_id( - user_id, project_id, from_api=True - ) - except exceptions.EntityNotFoundException: - return JSONResponse({"error": "Could not find project"}, status_code=404) - except exceptions.AccessDeniedException: - return JSONResponse({"error": "Access denied"}, status_code=403) - file_name = request_body["file_name"] - file_type = request_body["file_type"] - file_import_options = request_body.get("file_import_options") - task = upload_task_manager.create_upload_task( - user_id, - project_id, - file_name, - file_type, - file_import_options, - upload_type=enums.UploadTypes.DEFAULT.value, - ) - org_id = organization.get_id_by_project_id(project_id) - credentials_and_id = s3.get_upload_credentials_and_id( - org_id, f"{project_id}/{task.id}" - ) - return JSONResponse(credentials_and_id) - - class JSONImport(HTTPEndpoint): async def post(self, request) -> JSONResponse: project_id = request.path_params["project_id"] @@ -239,34 +169,6 @@ def put(self, request) -> PlainTextResponse: return PlainTextResponse("OK") -class AssociationsImport(HTTPEndpoint): - async def post(self, request) -> JSONResponse: - # Will be removed as part of the python sdk removal - return JSONResponse({"error": "Not supported anymore"}, status_code=404) - - project_id = request.path_params["project_id"] - request_body = await request.json() - user_id = request_body["user_id"] - try: - auth_manager.check_project_access_from_user_id( - user_id, project_id, from_api=True - ) - except exceptions.EntityNotFoundException: - return JSONResponse({"error": "Could not find project"}, status_code=404) - except exceptions.AccessDeniedException: - return JSONResponse({"error": "Access denied"}, status_code=403) - new_associations_added = association_transfer_manager.import_associations( - project_id, - user_id, - request_body["name"], - request_body["label_task_name"], - request_body["associations"], - request_body["indices"], - request_body["source_type"], - ) - return JSONResponse(new_associations_added) - - class UploadTaskInfo(HTTPEndpoint): def get(self, request) -> JSONResponse: project_id = request.path_params["project_id"] diff --git a/app.py b/app.py index d0483d49..50682f19 100644 --- a/app.py +++ b/app.py @@ -7,13 +7,9 @@ ) from api.project import ProjectDetails from api.transfer import ( - AssociationsImport, - FileExport, - JSONImport, - KnowledgeBaseExport, Notify, - PrepareFileImport, UploadTaskInfo, + JSONImport, CognitionImport, CognitionPrepareProject, ) @@ -124,13 +120,6 @@ Route("/notify/{path:path}", Notify), Route("/healthcheck", Healthcheck), Route("/project/{project_id:str}", ProjectDetails), - Route( - "/project/{project_id:str}/knowledge_base/{knowledge_base_id:str}", - KnowledgeBaseExport, - ), - Route("/project/{project_id:str}/associations", AssociationsImport), - Route("/project/{project_id:str}/export", FileExport), - Route("/project/{project_id:str}/import_file", PrepareFileImport), Route("/project/{project_id:str}/import_json", JSONImport), Route( "/project/{project_id:str}/cognition/continue/{task_id:str}", CognitionImport From ad63c865a61e7c1f715132821d311bc7fb04b7a1 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 13:49:40 +0100 Subject: [PATCH 02/23] remove projectdetails --- api/project.py | 34 ---------------------------------- app.py | 3 --- 2 files changed, 37 deletions(-) diff --git a/api/project.py b/api/project.py index 63ed5059..b49a09a7 100644 --- a/api/project.py +++ b/api/project.py @@ -8,37 +8,3 @@ from submodules.model import exceptions logging.basicConfig(level=logging.DEBUG) - - -class ProjectDetails(HTTPEndpoint): - def get(self, request) -> JSONResponse: - project_id = request.path_params["project_id"] - user_id = request.query_params["user_id"] - try: - auth_manager.check_project_access_from_user_id( - user_id, project_id, from_api=True - ) - except exceptions.EntityNotFoundException: - return JSONResponse({"error": "Could not find project"}, status_code=404) - except exceptions.AccessDeniedException: - return JSONResponse({"error": "Access denied"}, status_code=403) - project = project_manager.get_project(project_id) - max_running_id = project_manager.get_max_running_id(project_id) - attributes = attribute_manager.get_all_attributes(project_id, ["ALL"]) - result = { - "name": project.name, - "max_running_id": max_running_id, - "description": project.description, - "tokenizer": project.tokenizer, - "attributes": [ - { - "name": attribute.name, - "data_type": attribute.data_type, - "is_primary_key": attribute.is_primary_key, - "state": attribute.state, - } - for attribute in attributes - ], - "knowledge_base_ids": [str(list.id) for list in project.knowledge_bases], - } - return JSONResponse(result) diff --git a/app.py b/app.py index 50682f19..bdda54f7 100644 --- a/app.py +++ b/app.py @@ -5,7 +5,6 @@ from api.misc import ( FullConfigRest, ) -from api.project import ProjectDetails from api.transfer import ( Notify, UploadTaskInfo, @@ -118,8 +117,6 @@ routes = [ Route("/full_config", FullConfigRest), Route("/notify/{path:path}", Notify), - Route("/healthcheck", Healthcheck), - Route("/project/{project_id:str}", ProjectDetails), Route("/project/{project_id:str}/import_json", JSONImport), Route( "/project/{project_id:str}/cognition/continue/{task_id:str}", CognitionImport From ec493f2cec641efcec8e486c2e41aa107978cb6b Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 13:50:14 +0100 Subject: [PATCH 03/23] remove projectdetails --- app.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app.py b/app.py index bdda54f7..44a1d846 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ from api.misc import ( FullConfigRest, ) + from api.transfer import ( Notify, UploadTaskInfo, @@ -117,6 +118,7 @@ routes = [ Route("/full_config", FullConfigRest), Route("/notify/{path:path}", Notify), + Route("/healthcheck", Healthcheck), Route("/project/{project_id:str}/import_json", JSONImport), Route( "/project/{project_id:str}/cognition/continue/{task_id:str}", CognitionImport From 036c7ba1046e4cfc89462ef7369c23d70081e782 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 13:50:38 +0100 Subject: [PATCH 04/23] remove file project.py --- api/project.py | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 api/project.py diff --git a/api/project.py b/api/project.py deleted file mode 100644 index b49a09a7..00000000 --- a/api/project.py +++ /dev/null @@ -1,10 +0,0 @@ -import logging -from starlette.endpoints import HTTPEndpoint -from starlette.responses import JSONResponse - -from controller.auth import manager as auth_manager -from controller.project import manager as project_manager -from controller.attribute import manager as attribute_manager -from submodules.model import exceptions - -logging.basicConfig(level=logging.DEBUG) From 8457a5adaed5a620a95759de75fdf794562b114e Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 13:51:17 +0100 Subject: [PATCH 05/23] remove health check --- api/healthcheck.py | 18 ------------------ app.py | 2 -- 2 files changed, 20 deletions(-) delete mode 100644 api/healthcheck.py diff --git a/api/healthcheck.py b/api/healthcheck.py deleted file mode 100644 index 53a712d2..00000000 --- a/api/healthcheck.py +++ /dev/null @@ -1,18 +0,0 @@ -from starlette.endpoints import HTTPEndpoint -from starlette.responses import PlainTextResponse -from submodules.model.business_objects import general -from starlette import status - - -class Healthcheck(HTTPEndpoint): - def get(self, request) -> PlainTextResponse: - text = "" - status_code = status.HTTP_200_OK - database_test = general.test_database_connection() - if not database_test.get("success"): - error_name = database_test.get("error") - text += f"database_error:{error_name}:" - status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - if not text: - text = "OK" - return PlainTextResponse(text, status_code=status_code) diff --git a/app.py b/app.py index 44a1d846..e7e3e005 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,5 @@ import logging from fastapi import FastAPI -from api.healthcheck import Healthcheck from starlette.middleware import Middleware from api.misc import ( FullConfigRest, @@ -118,7 +117,6 @@ routes = [ Route("/full_config", FullConfigRest), Route("/notify/{path:path}", Notify), - Route("/healthcheck", Healthcheck), Route("/project/{project_id:str}/import_json", JSONImport), Route( "/project/{project_id:str}/cognition/continue/{task_id:str}", CognitionImport From 59fceb3e0e35288cf253eeb356aa7264ff439e43 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 13:57:59 +0100 Subject: [PATCH 06/23] remove notify --- api/transfer.py | 53 ------------------------------------------------- app.py | 2 -- 2 files changed, 55 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 7a17679e..664f9100 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -10,7 +10,6 @@ import_preparator as cognition_preparator, import_wizard as cognition_import_wizard, ) -from exceptions.exceptions import BadPasswordError from submodules.model.business_objects import ( attribute, general, @@ -33,7 +32,6 @@ from submodules.model.models import UploadTask from util import notification from submodules.model import daemon -from controller.transfer.cognition.minio_upload import handle_cognition_file_upload from controller.task_master import manager as task_master_manager from submodules.model.enums import TaskType, RecordTokenizationScope @@ -43,57 +41,6 @@ logger = logging.getLogger(__name__) -class Notify(HTTPEndpoint): - async def post(self, request) -> PlainTextResponse: - data = await request.json() - file_path = data["Key"] - - parts = file_path.split("/") - - if parts[1] == "_cognition": - handle_cognition_file_upload(parts) - return PlainTextResponse("OK") - - if len(parts) != 4: - # We need handling for lf execution notification here. - # ATM we have a different path of handling in util/payload_scheduler.py update_records method - return PlainTextResponse("OK") - - org_id, project_id, upload_task_id, file_name = parts - if len(project_id) != 36: - return PlainTextResponse("OK") - if upload_task_id == "download": - return PlainTextResponse("OK") - if org_id == "archive": - return PlainTextResponse("OK") - - task = upload_task_manager.get_upload_task_secure( - upload_task_id=upload_task_id, - project_id=project_id, - file_name=file_name, - ) - is_global_update = True if task.file_type == "project" else False - try: - init_file_import(task, project_id, is_global_update) - except BadPasswordError: - file_import_error_handling( - task, - project_id, - is_global_update, - enums.NotificationType.BAD_PASSWORD_DURING_IMPORT, - print_traceback=False, - ) - notification.send_organization_update( - project_id, f"bad_password:{project_id}", True - ) - except Exception: - file_import_error_handling(task, project_id, is_global_update) - notification.send_organization_update( - project_id, f"project_update:{project_id}", True - ) - return PlainTextResponse("OK") - - class JSONImport(HTTPEndpoint): async def post(self, request) -> JSONResponse: project_id = request.path_params["project_id"] diff --git a/app.py b/app.py index e7e3e005..66634dfa 100644 --- a/app.py +++ b/app.py @@ -6,7 +6,6 @@ ) from api.transfer import ( - Notify, UploadTaskInfo, JSONImport, CognitionImport, @@ -116,7 +115,6 @@ routes = [ Route("/full_config", FullConfigRest), - Route("/notify/{path:path}", Notify), Route("/project/{project_id:str}/import_json", JSONImport), Route( "/project/{project_id:str}/cognition/continue/{task_id:str}", CognitionImport From 770f3f64a21944e01125dfc8dc87c4dc21755aad Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 14:01:41 +0100 Subject: [PATCH 07/23] Remove import/task --- api/transfer.py | 25 ------------------------- app.py | 2 -- 2 files changed, 27 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 664f9100..ae4fe3e6 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -116,31 +116,6 @@ def put(self, request) -> PlainTextResponse: return PlainTextResponse("OK") -class UploadTaskInfo(HTTPEndpoint): - def get(self, request) -> JSONResponse: - project_id = request.path_params["project_id"] - task_id = request.path_params["task_id"] - user_id = request.query_params["user_id"] - try: - auth_manager.check_project_access_from_user_id( - user_id, project_id, from_api=True - ) - except exceptions.EntityNotFoundException: - return JSONResponse({"error": "Could not find project"}, status_code=404) - except exceptions.AccessDeniedException: - return JSONResponse({"error": "Access denied"}, status_code=403) - task = upload_task_manager.get_upload_task(project_id, task_id) - task_dict = { - "id": str(task.id), - "file_name": str(task.file_name), - "file_type": str(task.file_type), - "progress": task.progress, - "state": str(task.state), - "started_at": str(task.started_at), - } - return JSONResponse(task_dict) - - def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) -> None: task_state = task.state if "records" in task.file_type: diff --git a/app.py b/app.py index 66634dfa..98f9c03f 100644 --- a/app.py +++ b/app.py @@ -6,7 +6,6 @@ ) from api.transfer import ( - UploadTaskInfo, JSONImport, CognitionImport, CognitionPrepareProject, @@ -123,7 +122,6 @@ "/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize", CognitionPrepareProject, ), - Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo), Mount("/api", app=fastapi_app, name="REST API"), Mount( "/internal/api", app=fastapi_app_internal, name="INTERNAL REST API" From 2cd09d8735f7d056912b100961cc0a0bc5d1cdb4 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 14:05:46 +0100 Subject: [PATCH 08/23] Remove Import --- api/transfer.py | 25 ------------------------- app.py | 4 ---- 2 files changed, 29 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index ae4fe3e6..59896e6a 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -73,31 +73,6 @@ async def post(self, request) -> JSONResponse: return JSONResponse({"success": True}) -class CognitionImport(HTTPEndpoint): - def put(self, request) -> PlainTextResponse: - project_id = request.path_params["project_id"] - task_id = request.path_params["task_id"] - task = upload_task_manager.get_upload_task( - task_id=task_id, - project_id=project_id, - ) - if task.upload_type != enums.UploadTypes.COGNITION.value: - return PlainTextResponse("OK") - # since upload type is set to COGNITION for the first step of the upload (file upload / mapping prep) - # this / the continuation of the import should only be done once so we set it back to default to prevent this & differentiate between the steps - task.upload_type = enums.UploadTypes.DEFAULT.value - if task.state != enums.UploadStates.PREPARED.value: - return PlainTextResponse("Bad upload task", status_code=400) - try: - init_file_import(task, project_id, False) - except Exception: - file_import_error_handling(task, project_id, False) - notification.send_organization_update( - project_id, f"project_update:{project_id}", True - ) - return PlainTextResponse("OK") - - class CognitionPrepareProject(HTTPEndpoint): def put(self, request) -> PlainTextResponse: cognition_project_id = request.path_params["cognition_project_id"] diff --git a/app.py b/app.py index 98f9c03f..415d9eeb 100644 --- a/app.py +++ b/app.py @@ -7,7 +7,6 @@ from api.transfer import ( JSONImport, - CognitionImport, CognitionPrepareProject, ) from config_handler import ( @@ -115,9 +114,6 @@ routes = [ Route("/full_config", FullConfigRest), Route("/project/{project_id:str}/import_json", JSONImport), - Route( - "/project/{project_id:str}/cognition/continue/{task_id:str}", CognitionImport - ), Route( "/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize", CognitionPrepareProject, From 365569010f6bdbded99aa2ba9623bc71097bc227 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 14:08:05 +0100 Subject: [PATCH 09/23] Remove CognitionPrepareProject --- api/transfer.py | 25 +------------------------ app.py | 5 ----- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 59896e6a..4b86515b 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -3,12 +3,11 @@ import time from typing import Optional from starlette.endpoints import HTTPEndpoint -from starlette.responses import PlainTextResponse, JSONResponse +from starlette.responses import JSONResponse from controller.embedding.manager import recreate_embeddings from controller.transfer.cognition import ( import_preparator as cognition_preparator, - import_wizard as cognition_import_wizard, ) from submodules.model.business_objects import ( attribute, @@ -17,10 +16,6 @@ project, ) -from submodules.model.cognition_objects import ( - project as cognition_project, -) - from controller.transfer import manager as transfer_manager from controller.upload_task import manager as upload_task_manager from controller.auth import manager as auth_manager @@ -73,24 +68,6 @@ async def post(self, request) -> JSONResponse: return JSONResponse({"success": True}) -class CognitionPrepareProject(HTTPEndpoint): - def put(self, request) -> PlainTextResponse: - cognition_project_id = request.path_params["cognition_project_id"] - - cognition_project_item = cognition_project.get(cognition_project_id) - if not cognition_project_item: - return PlainTextResponse("Bad project id", status_code=400) - task_id = request.path_params["task_id"] - - daemon.run_without_db_token( - cognition_import_wizard.prepare_and_finalize_setup, - cognition_project_id=cognition_project_id, - task_id=task_id, - ) - - return PlainTextResponse("OK") - - def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) -> None: task_state = task.state if "records" in task.file_type: diff --git a/app.py b/app.py index 415d9eeb..29ba56a1 100644 --- a/app.py +++ b/app.py @@ -7,7 +7,6 @@ from api.transfer import ( JSONImport, - CognitionPrepareProject, ) from config_handler import ( init_config, @@ -114,10 +113,6 @@ routes = [ Route("/full_config", FullConfigRest), Route("/project/{project_id:str}/import_json", JSONImport), - Route( - "/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize", - CognitionPrepareProject, - ), Mount("/api", app=fastapi_app, name="REST API"), Mount( "/internal/api", app=fastapi_app_internal, name="INTERNAL REST API" From 11fde32f9a0c28ffbf9180db47f05f00f92f2080 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 14:26:48 +0100 Subject: [PATCH 10/23] Remove unused functions --- api/transfer.py | 155 ------------------------------------------------ app.py | 3 +- 2 files changed, 2 insertions(+), 156 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 4b86515b..d4335da4 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -1,35 +1,22 @@ import logging import traceback -import time from typing import Optional from starlette.endpoints import HTTPEndpoint from starlette.responses import JSONResponse -from controller.embedding.manager import recreate_embeddings -from controller.transfer.cognition import ( - import_preparator as cognition_preparator, -) from submodules.model.business_objects import ( - attribute, general, - tokenization, - project, ) from controller.transfer import manager as transfer_manager from controller.upload_task import manager as upload_task_manager from controller.auth import manager as auth_manager -from controller.attribute import manager as attribute_manager from controller.project import manager as project_manager from submodules.model import enums, exceptions from util.notification import create_notification from submodules.model.enums import NotificationType from submodules.model.models import UploadTask from util import notification -from submodules.model import daemon - -from controller.task_master import manager as task_master_manager -from submodules.model.enums import TaskType, RecordTokenizationScope logging.basicConfig(level=logging.DEBUG) @@ -68,48 +55,6 @@ async def post(self, request) -> JSONResponse: return JSONResponse({"success": True}) -def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) -> None: - task_state = task.state - if "records" in task.file_type: - if task.upload_type == enums.UploadTypes.COGNITION.value: - cognition_preparator.prepare_cognition_import(project_id, task) - else: - transfer_manager.import_records_from_file(project_id, task) - daemon.run_with_db_token( - __recalculate_missing_attributes_and_embeddings, - project_id, - str(task.user_id), - ) - - elif "project" in task.file_type: - transfer_manager.import_project(project_id, task) - elif "knowledge_base" in task.file_type: - transfer_manager.import_knowledge_base(project_id, task) - - if task.state == task_state: - # update is sent in update task if it was updated (e.g. with labeling studio) - notification.send_organization_update( - project_id, - f"file_upload:{str(task.id)}:state:{task.state}", - is_global_update, - ) - if task.file_type != "knowledge_base": - only_usable_attributes = task.file_type == "records_add" - project_item = project.get(project_id) - org_id = project_item.organization_id - task_master_manager.queue_task( - str(org_id), - str(task.user_id), - TaskType.TOKENIZATION, - { - "project_id": str(project_id), - "scope": RecordTokenizationScope.PROJECT.value, - "include_rats": True, - "only_uploaded_attributes": only_usable_attributes, - }, - ) - - def file_import_error_handling( task: UploadTask, project_id: str, @@ -139,103 +84,3 @@ def file_import_error_handling( notification.send_organization_update( project_id, f"file_upload:{str(task.id)}:state:{task.state}", is_global_update ) - - -def __recalculate_missing_attributes_and_embeddings( - project_id: str, user_id: str -) -> None: - __calculate_missing_attributes(project_id, user_id) - recreate_embeddings(project_id) - - -def __calculate_missing_attributes(project_id: str, user_id: str) -> None: - # wait a second to ensure that the process is started in the tokenization service - time.sleep(5) - attributes_usable = attribute.get_all_ordered( - project_id, - True, - state_filter=[ - enums.AttributeState.USABLE.value, - ], - ) - if len(attributes_usable) == 0: - return - - # stored as list so connection results do not affect - attribute_ids = [str(att_usable.id) for att_usable in attributes_usable] - for att_id in attribute_ids: - attribute.update(project_id, att_id, state=enums.AttributeState.INITIAL.value) - general.commit() - notification.send_organization_update( - project_id=project_id, message="calculate_attribute:started:all" - ) - try: - # first check project tokenization completed - i = 0 - while True: - i += 1 - if i >= 60: - i = 0 - daemon.reset_session_token_in_thread() - if tokenization.is_doc_bin_creation_running_or_queued(project_id): - time.sleep(2) - continue - else: - break - # next, ensure that the attributes are calculated and tokenized - i = 0 - while True: - time.sleep(1) - i += 1 - if len(attribute_ids) == 0: - break - if i >= 60: - i = 0 - daemon.reset_session_token_in_thread() - - current_att_id = attribute_ids[0] - current_att = attribute.get(project_id, current_att_id) - if current_att.state == enums.AttributeState.RUNNING.value: - continue - elif current_att.state == enums.AttributeState.INITIAL.value: - attribute_manager.calculate_user_attribute_all_records( - project_id, - project.get_org_id(project_id), - user_id, - current_att_id, - True, - ) - else: - if tokenization.is_doc_bin_creation_running_for_attribute( - project_id, current_att.name - ): - time.sleep(2) - continue - else: - attribute_ids.pop(0) - notification.send_organization_update( - project_id=project_id, - message=f"calculate_attribute:finished:{current_att_id}", - ) - time.sleep(2) - except Exception as e: - print( - f"Error while recreating attribute calculation for {project_id} when new records are uploaded : {e}" - ) - get_initial_attributes = attribute.get_all_ordered( - project_id, - True, - state_filter=[ - enums.AttributeState.INITIAL.value, - ], - ) - for attr in get_initial_attributes: - attribute.update( - project_id, attr.id, state=enums.AttributeState.FAILED.value - ) - general.commit() - finally: - notification.send_organization_update( - project_id=project_id, - message="calculate_attribute:finished:all", - ) diff --git a/app.py b/app.py index 29ba56a1..d3e0bf6f 100644 --- a/app.py +++ b/app.py @@ -1,16 +1,17 @@ import logging from fastapi import FastAPI from starlette.middleware import Middleware + from api.misc import ( FullConfigRest, ) - from api.transfer import ( JSONImport, ) from config_handler import ( init_config, ) + from fast_api.routes.organization import router as org_router from fast_api.routes.project import router as project_router from fast_api.routes.project_setting import router as project_setting_router From 5572c7fca2c49c7793cd4c572d5587407c567a4f Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 14:27:07 +0100 Subject: [PATCH 11/23] Remove unused functions --- api/transfer.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index d4335da4..c626aa3b 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -53,34 +53,3 @@ async def post(self, request) -> JSONResponse: request_body["is_last"], ) return JSONResponse({"success": True}) - - -def file_import_error_handling( - task: UploadTask, - project_id: str, - is_global_update: bool, - notification_type: Optional[NotificationType] = None, - print_traceback: bool = True, -) -> None: - general.rollback() - task.state = enums.UploadStates.ERROR.value - general.commit() - if not notification_type: - notification_type = NotificationType.IMPORT_FAILED - create_notification( - notification_type, - task.user_id, - task.project_id, - task.file_type, - ) - logger.error( - upload_task_manager.get_upload_task_message( - task, - ) - ) - if print_traceback: - print(traceback.format_exc(), flush=True) - - notification.send_organization_update( - project_id, f"file_upload:{str(task.id)}:state:{task.state}", is_global_update - ) From 5a366512899449347e2c3fc6e3809142000ab715 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Wed, 11 Dec 2024 14:27:43 +0100 Subject: [PATCH 12/23] Tidy up imports --- api/transfer.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index c626aa3b..266171e5 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -1,22 +1,11 @@ import logging -import traceback -from typing import Optional from starlette.endpoints import HTTPEndpoint from starlette.responses import JSONResponse -from submodules.model.business_objects import ( - general, -) - from controller.transfer import manager as transfer_manager -from controller.upload_task import manager as upload_task_manager from controller.auth import manager as auth_manager from controller.project import manager as project_manager -from submodules.model import enums, exceptions -from util.notification import create_notification -from submodules.model.enums import NotificationType -from submodules.model.models import UploadTask -from util import notification +from submodules.model import exceptions logging.basicConfig(level=logging.DEBUG) From 62f9144cae260276cb667a6ff81a7aa9e3d5cedf Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 10:42:04 +0100 Subject: [PATCH 13/23] import cleanup --- fast_api/routes/heuristic.py | 1 - fast_api/routes/labeling.py | 2 +- fast_api/routes/labeling_tasks.py | 3 --- fast_api/routes/project_setting.py | 1 - util/notification.py | 6 ++---- 5 files changed, 3 insertions(+), 10 deletions(-) diff --git a/fast_api/routes/heuristic.py b/fast_api/routes/heuristic.py index 87b24ae0..7927dc5f 100644 --- a/fast_api/routes/heuristic.py +++ b/fast_api/routes/heuristic.py @@ -8,7 +8,6 @@ from controller.payload import manager as payload_manager from submodules.model.business_objects import information_source from submodules.model.business_objects.payload import get_payload_with_heuristic_type -from submodules.model.enums import InformationSourceType from submodules.model.util import pack_edges_node, sql_alchemy_to_dict from util import notification from controller.task_master import manager as task_master_manager diff --git a/fast_api/routes/labeling.py b/fast_api/routes/labeling.py index 677b8017..d1d2bb54 100644 --- a/fast_api/routes/labeling.py +++ b/fast_api/routes/labeling.py @@ -15,7 +15,7 @@ StringBody, TokenizedRecordBody, ) -from submodules.model import enums, events +from submodules.model import enums from fast_api.routes.client_response import pack_json_result from controller.labeling_access_link import manager from controller.labeling_task_label import manager as label_manager diff --git a/fast_api/routes/labeling_tasks.py b/fast_api/routes/labeling_tasks.py index 235a2e0d..2d505d72 100644 --- a/fast_api/routes/labeling_tasks.py +++ b/fast_api/routes/labeling_tasks.py @@ -12,11 +12,8 @@ from controller.auth import manager as auth_manager from controller.labeling_task import manager as labeling_manager -from controller.project import manager as project_manager from controller.labeling_task_label import manager as label_manager -from controller.labeling_task import manager as task_manager from fast_api.routes.client_response import pack_json_result -from submodules.model import events from util import notification diff --git a/fast_api/routes/project_setting.py b/fast_api/routes/project_setting.py index 14ed903a..78071446 100644 --- a/fast_api/routes/project_setting.py +++ b/fast_api/routes/project_setting.py @@ -23,7 +23,6 @@ from fast_api.routes.client_response import pack_json_result from submodules.model.enums import TaskType from submodules.model.util import sql_alchemy_to_dict -from submodules.model import events from util import notification import traceback import json diff --git a/util/notification.py b/util/notification.py index d7e752d6..5b9150dc 100644 --- a/util/notification.py +++ b/util/notification.py @@ -1,14 +1,12 @@ import os -from typing import Union, List, Dict, Optional +from typing import List, Dict, Optional import requests import logging from controller.notification.notification_data import __notification_data -from submodules.model import events from exceptions import exceptions -from controller.user.manager import get_or_create_user -from submodules.model.business_objects import project, general, organization +from submodules.model.business_objects import project, organization from submodules.model.business_objects.notification import get_duplicated, create from submodules.model.business_objects.organization import get_organization_id from submodules.model.enums import NotificationType From 8b3a40d40974a5659db8bc7a53039bc1746b1a40 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:48:46 +0100 Subject: [PATCH 14/23] Re-add Notify and Healthcheck --- api/healthcheck.py | 18 ++++ api/transfer.py | 255 ++++++++++++++++++++++++++++++++++++++++++++- app.py | 4 + 3 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 api/healthcheck.py diff --git a/api/healthcheck.py b/api/healthcheck.py new file mode 100644 index 00000000..53a712d2 --- /dev/null +++ b/api/healthcheck.py @@ -0,0 +1,18 @@ +from starlette.endpoints import HTTPEndpoint +from starlette.responses import PlainTextResponse +from submodules.model.business_objects import general +from starlette import status + + +class Healthcheck(HTTPEndpoint): + def get(self, request) -> PlainTextResponse: + text = "" + status_code = status.HTTP_200_OK + database_test = general.test_database_connection() + if not database_test.get("success"): + error_name = database_test.get("error") + text += f"database_error:{error_name}:" + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + if not text: + text = "OK" + return PlainTextResponse(text, status_code=status_code) diff --git a/api/transfer.py b/api/transfer.py index 266171e5..2f0268f0 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -1,11 +1,38 @@ import logging +import traceback +import time +from typing import Optional from starlette.endpoints import HTTPEndpoint -from starlette.responses import JSONResponse +from starlette.responses import PlainTextResponse, JSONResponse +from controller.embedding.manager import recreate_embeddings + +from controller.transfer.cognition import ( + import_preparator as cognition_preparator, +) +from exceptions.exceptions import BadPasswordError +from submodules.model.business_objects import ( + attribute, + general, + tokenization, + project, +) from controller.transfer import manager as transfer_manager +from controller.upload_task import manager as upload_task_manager from controller.auth import manager as auth_manager from controller.project import manager as project_manager -from submodules.model import exceptions +from controller.attribute import manager as attribute_manager + +from submodules.model import enums, exceptions +from util.notification import create_notification +from submodules.model.enums import NotificationType +from submodules.model.models import UploadTask +from util import notification +from submodules.model import daemon +from controller.transfer.cognition.minio_upload import handle_cognition_file_upload + +from controller.task_master import manager as task_master_manager +from submodules.model.enums import TaskType, RecordTokenizationScope logging.basicConfig(level=logging.DEBUG) @@ -42,3 +69,227 @@ async def post(self, request) -> JSONResponse: request_body["is_last"], ) return JSONResponse({"success": True}) + + +class Notify(HTTPEndpoint): + async def post(self, request) -> PlainTextResponse: + data = await request.json() + file_path = data["Key"] + + parts = file_path.split("/") + + if parts[1] == "_cognition": + handle_cognition_file_upload(parts) + return PlainTextResponse("OK") + + if len(parts) != 4: + # We need handling for lf execution notification here. + # ATM we have a different path of handling in util/payload_scheduler.py update_records method + return PlainTextResponse("OK") + + org_id, project_id, upload_task_id, file_name = parts + if len(project_id) != 36: + return PlainTextResponse("OK") + if upload_task_id == "download": + return PlainTextResponse("OK") + if org_id == "archive": + return PlainTextResponse("OK") + + task = upload_task_manager.get_upload_task_secure( + upload_task_id=upload_task_id, + project_id=project_id, + file_name=file_name, + ) + is_global_update = True if task.file_type == "project" else False + try: + init_file_import(task, project_id, is_global_update) + except BadPasswordError: + file_import_error_handling( + task, + project_id, + is_global_update, + enums.NotificationType.BAD_PASSWORD_DURING_IMPORT, + print_traceback=False, + ) + notification.send_organization_update( + project_id, f"bad_password:{project_id}", True + ) + except Exception: + file_import_error_handling(task, project_id, is_global_update) + notification.send_organization_update( + project_id, f"project_update:{project_id}", True + ) + return PlainTextResponse("OK") + + +def init_file_import(task: UploadTask, project_id: str, is_global_update: bool) -> None: + task_state = task.state + if "records" in task.file_type: + if task.upload_type == enums.UploadTypes.COGNITION.value: + cognition_preparator.prepare_cognition_import(project_id, task) + else: + transfer_manager.import_records_from_file(project_id, task) + daemon.run_with_db_token( + __recalculate_missing_attributes_and_embeddings, + project_id, + str(task.user_id), + ) + + elif "project" in task.file_type: + transfer_manager.import_project(project_id, task) + elif "knowledge_base" in task.file_type: + transfer_manager.import_knowledge_base(project_id, task) + + if task.state == task_state: + # update is sent in update task if it was updated (e.g. with labeling studio) + notification.send_organization_update( + project_id, + f"file_upload:{str(task.id)}:state:{task.state}", + is_global_update, + ) + if task.file_type != "knowledge_base": + only_usable_attributes = task.file_type == "records_add" + project_item = project.get(project_id) + org_id = project_item.organization_id + task_master_manager.queue_task( + str(org_id), + str(task.user_id), + TaskType.TOKENIZATION, + { + "project_id": str(project_id), + "scope": RecordTokenizationScope.PROJECT.value, + "include_rats": True, + "only_uploaded_attributes": only_usable_attributes, + }, + ) + + +def file_import_error_handling( + task: UploadTask, + project_id: str, + is_global_update: bool, + notification_type: Optional[NotificationType] = None, + print_traceback: bool = True, +) -> None: + general.rollback() + task.state = enums.UploadStates.ERROR.value + general.commit() + if not notification_type: + notification_type = NotificationType.IMPORT_FAILED + create_notification( + notification_type, + task.user_id, + task.project_id, + task.file_type, + ) + logger.error( + upload_task_manager.get_upload_task_message( + task, + ) + ) + if print_traceback: + print(traceback.format_exc(), flush=True) + + notification.send_organization_update( + project_id, f"file_upload:{str(task.id)}:state:{task.state}", is_global_update + ) + + +def __recalculate_missing_attributes_and_embeddings( + project_id: str, user_id: str +) -> None: + __calculate_missing_attributes(project_id, user_id) + recreate_embeddings(project_id) + + +def __calculate_missing_attributes(project_id: str, user_id: str) -> None: + # wait a second to ensure that the process is started in the tokenization service + time.sleep(5) + attributes_usable = attribute.get_all_ordered( + project_id, + True, + state_filter=[ + enums.AttributeState.USABLE.value, + ], + ) + if len(attributes_usable) == 0: + return + + # stored as list so connection results do not affect + attribute_ids = [str(att_usable.id) for att_usable in attributes_usable] + for att_id in attribute_ids: + attribute.update(project_id, att_id, state=enums.AttributeState.INITIAL.value) + general.commit() + notification.send_organization_update( + project_id=project_id, message="calculate_attribute:started:all" + ) + try: + # first check project tokenization completed + i = 0 + while True: + i += 1 + if i >= 60: + i = 0 + daemon.reset_session_token_in_thread() + if tokenization.is_doc_bin_creation_running_or_queued(project_id): + time.sleep(2) + continue + else: + break + # next, ensure that the attributes are calculated and tokenized + i = 0 + while True: + time.sleep(1) + i += 1 + if len(attribute_ids) == 0: + break + if i >= 60: + i = 0 + daemon.reset_session_token_in_thread() + + current_att_id = attribute_ids[0] + current_att = attribute.get(project_id, current_att_id) + if current_att.state == enums.AttributeState.RUNNING.value: + continue + elif current_att.state == enums.AttributeState.INITIAL.value: + attribute_manager.calculate_user_attribute_all_records( + project_id, + project.get_org_id(project_id), + user_id, + current_att_id, + True, + ) + else: + if tokenization.is_doc_bin_creation_running_for_attribute( + project_id, current_att.name + ): + time.sleep(2) + continue + else: + attribute_ids.pop(0) + notification.send_organization_update( + project_id=project_id, + message=f"calculate_attribute:finished:{current_att_id}", + ) + time.sleep(2) + except Exception as e: + print( + f"Error while recreating attribute calculation for {project_id} when new records are uploaded : {e}" + ) + get_initial_attributes = attribute.get_all_ordered( + project_id, + True, + state_filter=[ + enums.AttributeState.INITIAL.value, + ], + ) + for attr in get_initial_attributes: + attribute.update( + project_id, attr.id, state=enums.AttributeState.FAILED.value + ) + general.commit() + finally: + notification.send_organization_update( + project_id=project_id, + message="calculate_attribute:finished:all", + ) diff --git a/app.py b/app.py index d3e0bf6f..95724433 100644 --- a/app.py +++ b/app.py @@ -2,11 +2,13 @@ from fastapi import FastAPI from starlette.middleware import Middleware +from api.healthcheck import Healthcheck from api.misc import ( FullConfigRest, ) from api.transfer import ( JSONImport, + Notify, ) from config_handler import ( init_config, @@ -113,6 +115,8 @@ routes = [ Route("/full_config", FullConfigRest), + Route("/notify/{path:path}", Notify), + Route("/healthcheck", Healthcheck), Route("/project/{project_id:str}/import_json", JSONImport), Mount("/api", app=fastapi_app, name="REST API"), Mount( From bb15bfa3b1083e3222b1ebbc36965a8902a6d74f Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:49:39 +0100 Subject: [PATCH 15/23] Remove wizard import --- .../transfer/cognition/import_wizard.py | 784 ------------------ 1 file changed, 784 deletions(-) diff --git a/controller/transfer/cognition/import_wizard.py b/controller/transfer/cognition/import_wizard.py index f45da47e..e69de29b 100644 --- a/controller/transfer/cognition/import_wizard.py +++ b/controller/transfer/cognition/import_wizard.py @@ -1,784 +0,0 @@ -from contextvars import Token -from typing import List, Optional, Dict, Any -import json -import time -import requests -from uuid import uuid4 - -from submodules.model import enums -from submodules.model.business_objects import ( - general, - attribute as attribute_db_bo, - tokenization as tokenization_db_bo, - project as project_db_bo, - notification as notification_db_bo, - record as record_db_bo, -) -from submodules.model.cognition_objects import project as cognition_project - -from util import notification - -from controller.upload_task import manager as upload_task_manager -from controller.labeling_task import manager as labeling_task_manager -from controller.labeling_task_label import manager as label_manager -from controller.information_source import manager as information_source_manager -from controller.attribute import manager as attribute_manager -from controller.task_master import manager as task_master_manager -from controller.embedding import manager as embedding_manager - -from .bricks_loader import get_bricks_code_from_group, get_bricks_code_from_endpoint -from .constants import ( - CognitionProjects, - DEFAULT_MODEL, - MODEL_DOC2QUERY, - FREE_API_REQUEST_URL, -) -from .util import send_log_message -import traceback - - -class TokenRef: - def __init__(self): - self._token = general.get_ctx_token() - - def request_new(self): - self._token = general.remove_and_refresh_session(self._token, True) - - def cleanup(self): - general.remove_and_refresh_session(self._token, False) - - -def prepare_and_finalize_setup(cognition_project_id: str, task_id: str) -> None: - token_ref = TokenRef() - try: - __finalize_setup(token_ref, cognition_project_id, task_id) - except Exception as e: - print(f"Error during wizard setup: {str(e)}", flush=True) - print(traceback.format_exc()) - finally: - token_ref.cleanup() - - -def __finalize_setup( - token_ref: TokenRef, cognition_project_id: str, task_id: str -) -> None: - cognition_project_item = cognition_project.get(cognition_project_id) - cognition_project_item.state = enums.CognitionProjectState.WIZARD_RUNNING.value - general.commit() - # unbind to prevent session issues - organization_id = str(cognition_project_item.organization_id) - reference_project_id = str(cognition_project_item.refinery_references_project_id) - question_project_id = str(cognition_project_item.refinery_question_project_id) - relevance_project_id = str(cognition_project_item.refinery_relevance_project_id) - - project_language = str(project_db_bo.get(reference_project_id).tokenizer_blank) - - task = upload_task_manager.get_upload_task( - task_id=task_id, - project_id=reference_project_id, - ) - if not task: - raise ValueError("Task not found") - file_additional_info = json.loads(task.file_additional_info) - - user_id = str(task.user_id) - - send_log_message( - question_project_id, - "Generating questions based on references", - ) - # first add actual records to question & relevance - if __add_records_to_question_and_relevance( - reference_project_id, - question_project_id, - relevance_project_id, - user_id, - project_language, - 32, - ): - send_log_message( - question_project_id, - "Generating questions based on references - finished", - ) - else: - send_log_message( - question_project_id, - "Couldn't generate enough question data - stopping wizard", - True, - ) - return - - # then add additional tasks to queue - - task_list = [] - __finalize_setup_for( - CognitionProjects.REFERENCE, - reference_project_id, - organization_id, - user_id, - project_language, - file_additional_info, - task_list, - token_ref, - ) - notification.send_organization_update( - cognition_project_id, - "cognition_wizard:prep:REFERENCE:COMPLETE", - organization_id=organization_id, - ) - - __finalize_setup_for( - CognitionProjects.QUESTION, - question_project_id, - organization_id, - user_id, - project_language, - file_additional_info, - task_list, - token_ref, - ) - - # sample data from references & send to doc to query - - notification.send_organization_update( - cognition_project_id, - "cognition_wizard:prep:QUESTION:COMPLETE", - organization_id=organization_id, - ) - - ## additional question task creation - qdrant_filter = file_additional_info.get("qdrant_filter", []) - - for item in qdrant_filter: - labels = [] - if item["type"] == "ATTRIBUTE": - labels = attribute_db_bo.get_unique_values( - reference_project_id, item["name"] - ) - __create_task_and_labels_for(question_project_id, item["name"], labels) - - __finalize_setup_for( - CognitionProjects.RELEVANCE, - relevance_project_id, - organization_id, - user_id, - project_language, - file_additional_info, - task_list, - token_ref, - ) - notification.send_organization_update( - cognition_project_id, - "cognition_wizard:prep:RELEVANCE:COMPLETE", - organization_id=organization_id, - ) - task_list.append( - { - "organization_id": organization_id, - "project_id": reference_project_id, - "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, - "action": { - "action_type": enums.TaskQueueAction.FINISH_COGNITION_SETUP.value, - "cognition_project_id": cognition_project_id, - }, - } - ) - - # wait for initial tokenization to finish - c = 0 - while True: - time.sleep(1) - c += 1 - if c > 120: - token_ref.request_new() - c = 0 - if tokenization_db_bo.is_doc_bin_creation_running_or_queued( - reference_project_id - ): - continue - if tokenization_db_bo.is_doc_bin_creation_running_or_queued( - question_project_id - ): - continue - if tokenization_db_bo.is_doc_bin_creation_running_or_queued( - relevance_project_id - ): - continue - break - - queue_response = task_master_manager.queue_task( - organization_id, - user_id, - enums.TaskType.TASK_QUEUE, - {"project_id": cognition_project_id, "task_list": task_list}, - ) - if queue_response.ok: - queue_info = queue_response.json() - task_id = queue_info["task_id"] - position = queue_info.get("position") - - notification.send_organization_update( - cognition_project_id, - f"cognition_wizard:task_queue:{task_id}:{len(task_list)}", - organization_id=organization_id, - ) - if position: - notification.send_organization_update( - cognition_project_id, - f"task_queue:{str(task_id)}:QUEUE_POSITION:{position}", - organization_id=organization_id, - ) - - -# function called from queue as last entry -def finish_cognition_setup( - cognition_project_id: str, -) -> None: - ctx_token = general.get_ctx_token() - cognition_project_item = cognition_project.get(cognition_project_id) - if not cognition_project_item: - general.remove_and_refresh_session(ctx_token, False) - return - user_id = str(cognition_project_item.created_by) - notification_db_bo.set_notifications_to_not_initial( - str(cognition_project_item.refinery_references_project_id), user_id - ) - notification_db_bo.set_notifications_to_not_initial( - str(cognition_project_item.refinery_question_project_id), user_id - ) - notification_db_bo.set_notifications_to_not_initial( - str(cognition_project_item.refinery_relevance_project_id), user_id - ) - - cognition_project_item.state = enums.CognitionProjectState.DEVELOPMENT.value - organization_id = str(cognition_project_item.organization_id) - general.commit() - general.remove_and_refresh_session(ctx_token, False) - notification.send_organization_update( - cognition_project_id, - "cognition_prep:state:DONE", - organization_id=organization_id, - ) - - -def __add_websocket_message_queue_item( - sender_project_id: str, - msg: str, - task_list: List[Dict[str, str]], - organization_id: Optional[str] = None, # needs to be set for cognition project ids -) -> None: - action = { - "action_type": enums.TaskQueueAction.SEND_WEBSOCKET.value, - "project_id": sender_project_id, - "message": msg, - } - if organization_id: - action["organization_id"] = organization_id - task_list.append( - { - "organization_id": organization_id, - "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, - "action": action, - } - ) - - -def __add_weakly_supervise_all_valid( - project_id: str, - org_id: str, - task_list: List[Dict[str, str]], -) -> None: - task_list.append( - { - "organization_id": org_id, - "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, - "action": { - "action_type": enums.TaskQueueAction.RUN_WEAK_SUPERVISION.value, - "project_id": project_id, - }, - } - ) - - -# task_list is appended with post processing steps for the task queue -def __finalize_setup_for( - project_type: CognitionProjects, - project_id: str, - org_id: str, - user_id: str, - project_language: str, - file_additional_info: Dict[str, Any], - task_list: List[Dict[str, str]], - token_ref: TokenRef, -) -> Token: - target_data = {"TARGET_LANGUAGE": project_language} - - # attributes - attributes = project_type.get_attributes() - target_data["target_type"] = "ac" - if attributes: - for attribute in attributes: - if not attribute: - # placeholder item - continue - run_code = attribute.get("run_code", True) - bricks = attribute.get("bricks") - if bricks: - __load_ac_from_bricks_group( - project_id, - org_id, - bricks["group"], - bricks.get("type", "generator"), - bricks.get("type_lookup", {}), - target_data, - task_list, - append_to_task_list=run_code, - ) - else: - code = attribute.get("code") - if not code: - code_build = attribute.get("code_build") - if not code_build: - raise ValueError("No code or code_build given") - - target_data["ATTRIBUTE"] = code_build.get( - "target_attribute", "reference" - ) - code = get_bricks_code_from_endpoint( - code_build["endpoint"], target_data - ) - __create_attribute_with( - project_id, - org_id, - code, - attribute["name"], - attribute["type"], - task_list, - run_code, - ) - - # tasks + functions - labeling_tasks = project_type.get_labeling_tasks() - task_lookup = {} # name -> id - target_data["target_type"] = "lf" - if labeling_tasks: - for task in labeling_tasks: - task_type = task.get("task_type") - task_attribute = task.get("target_attribute") - if ( - task_type == enums.LabelingTaskType.INFORMATION_EXTRACTION.value - and not task_attribute - ): - send_log_message( - project_id, - "Can't create extraction task without target attribute", - True, - ) - continue - if task_attribute: - task_attribute = str( - attribute_db_bo.get_by_name(project_id, task_attribute).id - ) - labeling_task_id = __create_task_and_labels_for( - project_id, - task["name"], - task["labels"], - task_type, - target_attribute_id=task_attribute, - ) - task_lookup[task["name"]] = labeling_task_id - bricks = task.get("bricks") - if bricks: - target_attribute = bricks.get("target_attribute", "reference") - target_data["ATTRIBUTE"] = target_attribute - __load_lf_from_bricks_group( - project_id, - labeling_task_id, - user_id, - org_id, - bricks["group"], - bricks.get("type", "classifier"), - target_data, - task_list, - name_prefix=bricks.get("function_prefix"), - ) - - # embeddings - selected_filter_attributes = [ - c["name"] - for c in file_additional_info.get("qdrant_filter", []) - if c["type"] == "ATTRIBUTE" - ] - - embeddings = project_type.get_embeddings() - target_data["target_type"] = "al" - if embeddings: - for embedding in embeddings: - filter_columns = embedding.get("filter") - if filter_columns == "FROM_WIZARD": - filter_columns = selected_filter_attributes - else: - filter_columns = [] - embedding_name = __add_embedding( - project_id, - org_id, - embedding.get("target", {}), - project_language, - filter_columns, - embedding.get("outlier_slice", False), - task_list, - ) - bricks = embedding.get("bricks") - if bricks: - target_data["EMBEDDING"] = embedding_name - labeling_task_id = task_lookup.get(bricks.get("target_task_name")) - if not labeling_task_id: - send_log_message( - project_id, - "Can't create active learner without task name", - True, - ) - continue - __load_active_learner_from_bricks_group( - project_id, - labeling_task_id, - user_id, - bricks["group"], - bricks.get("type", "classifier"), - target_data, - name_prefix=bricks.get("function_prefix"), - ) - __add_weakly_supervise_all_valid(project_id, org_id, task_list) - token_ref.request_new() - - -def __create_task_and_labels_for( - project_id: str, - task_name: str, - labels: List[str], - task_type: Optional[str] = None, - target_attribute_id: Optional[str] = None, -) -> str: - if task_type is None: - task_type = enums.LabelingTaskType.CLASSIFICATION.value - - task_item = labeling_task_manager.get_labeling_task_by_name(project_id, task_name) - if not task_item: - task_item = labeling_task_manager.create_labeling_task( - project_id, task_name, task_type, target_attribute_id - ) - else: - existing = set([label.name for label in task_item.labels]) - labels = [label for label in labels if label not in existing] - - label_manager.create_labels(project_id, str(task_item.id), labels) - return str(task_item.id) - - -def __load_lf_from_bricks_group( - target_project_id: str, - target_task_id: str, - user_id: str, - org_id: str, - group_key: str, - bricks_type: str, - target_data: Dict[str, str], - task_list: List[Dict[str, str]], - language_key: Optional[str] = None, - name_prefix: Optional[str] = None, - append_to_task_list: bool = True, -) -> None: - bricks_in_group = get_bricks_code_from_group( - group_key, - bricks_type, - language_key, - target_data, - name_prefix, - project_id=target_project_id, - ) - for name in bricks_in_group: - item = information_source_manager.create_information_source( - target_project_id, - user_id, - target_task_id, - name, - bricks_in_group[name]["code"], - "", - enums.InformationSourceType.LABELING_FUNCTION.value, - ) - if append_to_task_list: - task_list.append( - { - "organization_id": org_id, - "project_id": target_project_id, - "task_type": enums.TaskType.INFORMATION_SOURCE.value, - "information_source_id": str(item.id), - "source_type": enums.InformationSourceType.LABELING_FUNCTION.value, - } - ) - - -def __load_active_learner_from_bricks_group( - target_project_id: str, - target_task_id: str, - user_id: str, - group_key: str, - bricks_type: str, - target_data: Dict[str, str], - language_key: Optional[str] = None, - name_prefix: Optional[str] = None, -) -> None: - bricks_in_group = get_bricks_code_from_group( - group_key, - bricks_type, - language_key, - target_data, - name_prefix, - project_id=target_project_id, - ) - for name in bricks_in_group: - information_source_manager.create_information_source( - target_project_id, - user_id, - target_task_id, - name.replace("_", " ") - .title() - .replace(" ", ""), # to pascal case (e.g. random_forest -> RandomForest) - bricks_in_group[name]["code"], - "", - enums.InformationSourceType.ACTIVE_LEARNING.value, - ) - - -def __load_ac_from_bricks_group( - target_project_id: str, - org_id: str, - group_key: str, - bricks_type: str, - data_type_lookup: Dict[str, str], - target_data: Dict[str, str], - task_list: List[Dict[str, str]], - language_key: Optional[str] = None, - name_prefix: Optional[str] = None, - append_to_task_list: bool = True, -) -> None: - bricks_in_group = get_bricks_code_from_group( - group_key, - bricks_type, - language_key, - target_data, - name_prefix, - project_id=target_project_id, - ) - for name in bricks_in_group: - code = bricks_in_group[name]["code"] - data_type = data_type_lookup.get( - bricks_in_group[name]["endpoint"], enums.DataTypes.TEXT.value - ) - __create_attribute_with( - target_project_id, - org_id, - code, - name, - data_type, - task_list, - append_to_task_list=append_to_task_list, - ) - - -def __add_embedding( - target_project_id: str, - org_id: str, - target_info: Dict[str, str], - project_language: str, - filter_columns: List[str], - create_outlier_slice: bool, - task_list: List[Dict[str, str]], -) -> str: - target_attribute = target_info.get("attribute", "reference") - target_platform = target_info.get("platform", "huggingface") - target_model = None - if target_platform != enums.EmbeddingPlatform.COHERE.value: - target_model = target_info.get("model", {}) - if isinstance(target_model, dict): - target_model = target_model.get( - project_language, DEFAULT_MODEL[target_platform] - ) - target_embedding_type = target_info.get("embedding_type", "ON_ATTRIBUTE") - target_api_token = target_info.get("api_token") - attribute_item = attribute_db_bo.get_by_name(target_project_id, target_attribute) - if not attribute_item: - return - attribute_id = str(attribute_item.id) - embedding_name = embedding_manager.get_embedding_name( - target_project_id, - attribute_id, - target_platform, - target_embedding_type, - target_model, - target_api_token, - ) - task_list.append( - { - "organization_id": org_id, - "project_id": target_project_id, - "task_type": enums.TaskType.EMBEDDING.value, - "embedding_type": target_embedding_type, - "attribute_id": attribute_id, - "embedding_name": embedding_name, - "platform": target_platform, - "model": target_model, - "api_token": target_api_token, - "terms_text": embedding_manager.get_current_terms_text(target_platform), - "terms_accepted": target_api_token is not None, - "filter_attributes": filter_columns, - "additional_data": None, - } - ) - if create_outlier_slice: - task_list.append( - { - "organization_id": org_id, - "project_id": target_project_id, - "task_type": enums.TaskType.TASK_QUEUE_ACTION.value, - "action": { - "action_type": enums.TaskQueueAction.CREATE_OUTLIER_SLICE.value, - "embedding_name": embedding_name, - "project_id": target_project_id, - }, - } - ) - return embedding_name - - -def __create_attribute_with( - project_id: str, - org_id: str, - code: str, - name: str, - attribute_type: str, - task_list: List[Dict[str, str]], - append_to_task_list: bool = True, -) -> str: - attribute_item = attribute_manager.create_user_attribute( - project_id, name, attribute_type - ) - attribute_item.source_code = code - general.commit() - attribute_id = str(attribute_item.id) - if append_to_task_list: - task_list.append( - { - "organization_id": org_id, - "project_id": project_id, - "task_type": enums.TaskType.ATTRIBUTE_CALCULATION.value, - "attribute_id": attribute_id, - } - ) - return attribute_id - - -def __add_records_to_question_and_relevance( - reference_project_id: str, - question_project_id: str, - relevance_project_id: str, - user_id: str, - language: str, - amount: int, -) -> bool: - sample_facts = record_db_bo.get_sample_data_of( - reference_project_id, - "reference", - amount, - "LENGTH(data->>'reference') BETWEEN 5 AND 1024", - ) - - if len(sample_facts) < amount: - send_log_message( - question_project_id, - "Not enough sample data - we need at least 32 references between 5 and 1024 characters", - True, - ) - return False - - questions = __call_doc_2_query_free(language, sample_facts) - - if len(questions) != amount: - send_log_message( - question_project_id, - "Not enough query data - this shouldn't happen - contact support", - True, - ) - return False - - max_running_id_qu = record_db_bo.get_max_running_id(question_project_id) + 1 - max_running_id_re = record_db_bo.get_max_running_id(relevance_project_id) + 1 - final_json_to_add_questions = [] - final_json_to_add_relevance = [] - for idx, (reference, question) in enumerate(zip(sample_facts, questions)): - final_question = question - if "?" not in final_question: - final_question += "?" - final_question = final_question[0].title() + final_question[1:] - message_id = "mr-" + str(idx) - final_json_to_add_questions.append( - { - "running_id": max_running_id_qu + idx, - "message_id": message_id, - "question": final_question, - "question_prev_3": None, - "answer_prev_3": None, - "question_prev_2": None, - "answer_prev_2": None, - "question_prev_1": None, - "answer_prev_1": None, - "conversation_id": None, - } - ) - max_running_id_re += 1 - final_json_to_add_relevance.append( - { - "running_id": max_running_id_re + idx, - "question": final_question, - "message_id": message_id, - "reference": reference, - "__Fact is relevant": "Yes", - } - ), - __post_to_refinery_project( - question_project_id, user_id, final_json_to_add_questions - ) - __post_to_refinery_project( - relevance_project_id, user_id, final_json_to_add_relevance - ) - return True - - -def __post_to_refinery_project( - project_id: str, user_id: str, records: List[Dict[str, str]] -) -> None: - requests.post( - f"http://refinery-gateway:80/project/{project_id}/import_json", - json={ - "user_id": user_id, - "records": records, - "request_uuid": str(uuid4()), - "is_last": True, # or False - }, - ) - - -def __call_doc_2_query_free(language: str, texts_to_query: List[str]) -> List[str]: - if language not in MODEL_DOC2QUERY: - raise ValueError("Language not yet supported") - - response = requests.post( - FREE_API_REQUEST_URL, - json={"model_name": MODEL_DOC2QUERY.get(language), "text": texts_to_query}, - ) - response.raise_for_status() - return [r["generated_text"] for r in response.json()] - - -def dummy(): - pass From 2ffd3d35edbd9a8200766e5d58f6f1e991fd8939 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:49:52 +0100 Subject: [PATCH 16/23] Remove wizard import --- controller/transfer/cognition/import_wizard.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 controller/transfer/cognition/import_wizard.py diff --git a/controller/transfer/cognition/import_wizard.py b/controller/transfer/cognition/import_wizard.py deleted file mode 100644 index e69de29b..00000000 From 4a2395f7d681fed913ac5b888ad6e6cc20f550f9 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:52:08 +0100 Subject: [PATCH 17/23] Remove wizard template --- .../cognition/wizard_function_templates.py | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/controller/transfer/cognition/wizard_function_templates.py b/controller/transfer/cognition/wizard_function_templates.py index 7e318e5f..e69de29b 100644 --- a/controller/transfer/cognition/wizard_function_templates.py +++ b/controller/transfer/cognition/wizard_function_templates.py @@ -1,21 +0,0 @@ -REFERENCE_CHUNKS_SENT = """ -def ac(record): - # e.g. use spacy sentences to create a list - return [r.text for r in record["@@target_attribute@@"].sents] -""" - -REFERENCE_CHUNKS_SPLIT = """ -def ac(record): - splits = [t.strip() for t in record["@@target_attribute@@"].text.split("\\n")] - return [val for val in splits if len(val) > 0] -""" - -MAPPING_WRAPPER = """ -def @@target_name@@(record): - #this is a wrapper to map the labels to your project - result = bricks_base_function(record) - if result in my_custom_mapping: - result = my_custom_mapping[result] - if result: - return result -""" From 96461096320b247fdb11260077d256a881a96aa8 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:52:17 +0100 Subject: [PATCH 18/23] Remove wizard template --- controller/transfer/cognition/wizard_function_templates.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 controller/transfer/cognition/wizard_function_templates.py diff --git a/controller/transfer/cognition/wizard_function_templates.py b/controller/transfer/cognition/wizard_function_templates.py deleted file mode 100644 index e69de29b..00000000 From af5704a51e36ec06ee6f45a8adc8fcaa48bdfe65 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:54:34 +0100 Subject: [PATCH 19/23] Remove constants and bricks_loader --- .../transfer/cognition/bricks_loader.py | 232 ----------------- controller/transfer/cognition/constants.py | 245 ------------------ 2 files changed, 477 deletions(-) diff --git a/controller/transfer/cognition/bricks_loader.py b/controller/transfer/cognition/bricks_loader.py index 59a98890..e69de29b 100644 --- a/controller/transfer/cognition/bricks_loader.py +++ b/controller/transfer/cognition/bricks_loader.py @@ -1,232 +0,0 @@ -from typing import List, Dict, Optional, Any -import requests -import json -import re -from .wizard_function_templates import MAPPING_WRAPPER -from .util import send_log_message - -# this is a light implementation of the bricks loader, some variables etc. will error out - -BASE_URL = "https://cms.bricks.kern.ai/api/modules/?pagination[pageSize]=500" - -FUNCTION_REGEX = re.compile( - r"^def\s(\w+)(\([a-zA-Z0-9_:\[\]=, ]*\)):\s*$", re.MULTILINE -) -CLASS_REGEX = re.compile(r"^class ([\w]+)\(([^)]+)\):$", re.MULTILINE) -VARIABLE_REGEX = re.compile( - r"""^(([A-Z_]+):\s*(\w+)\s*=\s*(['"])*([ \w\_\-\<\>]+)(['"])*)""", re.MULTILINE -) - -AL_BACKEND_NAME = "ATLClassifier" -LF_BACKEND_NAME = "lf" -AC_BACKEND_NAME = "ac" - - -# language not yet supported by bricks -# returns a dict of "name": {"code": "code", "endpoint": "endpoint"} -def get_bricks_code_from_group( - group_key: str, - bricks_type: str, # "classifier" or "extractor" or "generator" - language_key: str, - target_data: Dict[str, str], - name_prefix: Optional[str] = None, - project_id: Optional[str] = None, -) -> Dict[str, str]: - if not name_prefix: - name_prefix = "" - bricks_infos = __get_bricks_config_by_group( - group_key, - bricks_type, - active_learner=target_data.get("target_type") == "al", - language_key=language_key, - project_id=project_id, - ) - - values = { - f"{name_prefix}{b['attributes']['endpoint']}": { - "code": __light_parse_bricks_code(b, target_data), - "endpoint": b["attributes"]["endpoint"], - } - for b in bricks_infos - } - return values - - -def __get_bricks_config_by_group( - group_key: str, - module_type: str = "classifier", - active_learner: bool = False, - language_key: Optional[str] = None, - project_id: Optional[str] = None, -) -> List[Dict]: - url = BASE_URL + "&filters[moduleType][$eq]=" + module_type - execution_type = "activeLearner" if active_learner else "pythonFunction" - url += "&filters[executionType][$eq]=" + execution_type - - url += '&filters[partOfGroup][$contains]="' + group_key + '"' - if language_key: - url += "&filters[language][$in][0]=multi" - url += "&filters[language][$in][0]=" + language_key - - response = requests.get(url) - if response.status_code != 200: - raise Exception("Could not load bricks from CMS") - - bricks_info = response.json() - data = bricks_info["data"] - if len(data) == 0 and project_id: - send_log_message( - project_id, - f"Found no entries from bricks group {group_key} - executionType {execution_type}", - True, - ) - for bricks_info in data: - bricks_info["attributes"]["integratorInputs"] = json.loads( - bricks_info["attributes"]["integratorInputs"] - ) - return data - - -# for singular bricks (e.g. language detection) -def get_bricks_code_from_endpoint(endpoint: str, target_data: Dict[str, str]) -> str: - bricks_info = __get_bricks_config_by_endpoint_name(endpoint) - return __light_parse_bricks_code(bricks_info, target_data) - - -def __get_bricks_config_by_endpoint_name(endpoint: str) -> Dict: - url = BASE_URL + "&filters[endpoint][$eq]=" + endpoint - response = requests.get(url) - if response.status_code != 200: - raise Exception("Could not load bricks from CMS") - - bricks_info = response.json() - data = bricks_info["data"] - if len(data) != 1: - raise Exception( - f"Found {len(data)} entries from endpoint {endpoint} expected exactly 1" - ) - bricks_info = data[0] - bricks_info["attributes"]["integratorInputs"] = json.loads( - bricks_info["attributes"]["integratorInputs"] - ) - return bricks_info - - -def __light_parse_bricks_code( - bricks_info: Dict[str, Any], target_data: Dict[str, str] -) -> str: - deletion_keys = [] - cognition_mapping = __parse_cognition_mapping( - bricks_info, target_data, deletion_keys - ) - has_mapping = cognition_mapping is not None and len(cognition_mapping.keys()) > 0 - - target_name = __get_target_name(target_data) - - code = bricks_info["attributes"]["sourceCodeRefinery"] - - if target_name == AL_BACKEND_NAME: - code = __replace_class_name_in_code(code, target_name) - else: - code = __replace_function_name_in_code(code, target_name, has_mapping) - - code = __replace_variables_in_code(code, target_data) - - if has_mapping: - code = __extend_code_by_mapping(code, cognition_mapping) - for key in deletion_keys: - del target_data[key] - return code - - -def __get_target_name(target_data: Dict[str, str]) -> str: - target_type = target_data.get("target_type", "lf") - if target_type == "lf": - return LF_BACKEND_NAME - elif target_type == "ac": - return AC_BACKEND_NAME - elif target_type == "al": - return AL_BACKEND_NAME - else: - raise Exception(f"Unknown target type {target_type}") - - -def __parse_cognition_mapping( - bricks_info: Dict[str, Any], target_data: Dict[str, str], deletion_keys: List[str] -) -> Optional[Dict[str, str]]: - mapping_string = bricks_info["attributes"].get("cognitionInitMapping") - if not mapping_string: - return None - cognition_mapping = json.loads(mapping_string) - if cognition_mapping: - keys = list(cognition_mapping.keys()) - for key in keys: - if cognition_mapping[key] == "null": - cognition_mapping[key] = None - # items with @@@@ are default values not actual mapping - if key.startswith("@@") and key.endswith("@@"): - target_data[key[2:-2]] = cognition_mapping[key] - del cognition_mapping[key] - deletion_keys.append(key[2:-2]) - return cognition_mapping - - -def __replace_function_name_in_code( - code: str, target_name: str, has_mapping: bool -) -> str: - found = re.search(FUNCTION_REGEX, code) - if not found: - raise Exception("Could not find function in code") - - new_fn_name = target_name - mapping_extension = "" - if has_mapping: - new_fn_name = "bricks_base_function" - mapping_extension = MAPPING_WRAPPER.replace("@@target_name@@", target_name) - - replace_code = f"{mapping_extension}\ndef {new_fn_name}{found.group(2)}:" - - code = re.sub(FUNCTION_REGEX, replace_code, code, 1) - return code - - -def __replace_class_name_in_code(code: str, target_name: str) -> str: - found = re.search(CLASS_REGEX, code) - if not found: - raise Exception("Could not find class in code") - - replace_code = f"class {target_name}({found.group(2)}):" - - code = re.sub(CLASS_REGEX, replace_code, code, 1) - return code - - -def __replace_variables_in_code(code: str, target_data: Dict[str, str]) -> str: - groups = re.findall(VARIABLE_REGEX, code) - for found in groups: - full_match, g1, g2, g3, g4, g5 = found - target_name = target_data.get(g1) - if not target_name: - # no value to set so default code is used - continue - - code = code.replace( - full_match, - f"{g1}: {g2} = {g3}{target_name}{g5}", - 1, - ) - - return code - - -def __extend_code_by_mapping(code: str, mapping: Dict[str, str]): - mapping_block = "#generated by the bricks integrator\nmy_custom_mapping = {" - for key in mapping: - mapping_block += f'\n "{key}": ' - if mapping[key]: - mapping_block += f'"{mapping[key]}"' - else: - mapping_block += "None" - mapping_block += "," - mapping_block += "\n}" - return code + "\n\n" + mapping_block diff --git a/controller/transfer/cognition/constants.py b/controller/transfer/cognition/constants.py index af5bfe30..e69de29b 100644 --- a/controller/transfer/cognition/constants.py +++ b/controller/transfer/cognition/constants.py @@ -1,245 +0,0 @@ -from enum import Enum -from submodules.model.enums import DataTypes, EmbeddingPlatform, LabelingTaskType -from .wizard_function_templates import REFERENCE_CHUNKS_SENT - - -class CognitionProjects(Enum): - REFERENCE = "REFERENCE" - QUESTION = "QUESTION" - RELEVANCE = "RELEVANCE" - - def get_labeling_tasks(self): - return TASK_INFO[self].get("labeling_tasks") - - def get_attributes(self): - return TASK_INFO[self].get("attributes") - - def get_embeddings(self): - return TASK_INFO[self].get("embeddings") - - -TASK_INFO = { - CognitionProjects.REFERENCE: { - "labeling_tasks": [ - { - "name": "Reference Quality", - "labels": ["Sufficient", "Needs fix"], - "bricks": { - "group": "reference_quality", - # "function_prefix": "RQ_", - }, - }, - { - "name": "Reference Complexity", - "labels": ["Low", "Medium", "High"], - "bricks": { - "group": "reference_complexity", - }, - }, - # currently removed since not part of initial offering - # { - # "name": "Reference Type", - # "labels": ["Unknown"], - # # "bricks": { - # # "group": "reference_type", - # # }, - # }, - { - "name": "Personal Identifiable Information (PII)", - "labels": [ - "Person", - "Date", - "Time", - "Organization", - "IP Address", - "Phone number", - "URL", - "E-Mail", - "Zip code", - "Location", - ], - "task_type": LabelingTaskType.INFORMATION_EXTRACTION.value, - "target_attribute": "reference", - "bricks": { - "group": "personal_identifiers", - "type": "extractor", - }, - }, - ], - "attributes": [ - { - "name": "Language", - "type": DataTypes.CATEGORY.value, - "code_build": {"endpoint": "language_detection"}, - }, - { - "name": "reference_chunks", - "type": DataTypes.EMBEDDING_LIST.value, - "code": REFERENCE_CHUNKS_SENT.replace( - "@@target_attribute@@", "reference" - ), - }, - ], - "embeddings": [ - { - "target": { - "attribute": "reference", - "platform": "huggingface", - "model": { - "de": "bert-base-german-cased", - "en": "distilbert-base-uncased", - }, - }, - "filter": "FROM_WIZARD", - "outlier_slice": True, - "bricks": { - "group": "active_learner", - "target_task_name": "Reference Quality", - }, - }, - { - "target": { - "attribute": "reference_chunks", - "platform": "huggingface", - "model": { - "de": "bert-base-german-cased", - "en": "distilbert-base-uncased", - }, - }, - "filter": "FROM_WIZARD", - "outlier_slice": True, - # "bricks": { - # "group": "active_learner", - # "target_task_name": "Reference Complexity", - # }, - }, - ], - }, - CognitionProjects.QUESTION: { - "labeling_tasks": [ - { - "name": "Communication Style", - "labels": [ - "Action-seeking", - "Fact-oriented", - "Information-seeking", - "Self-revealing", - ], - "bricks": { - "group": "communication_style", - "target_attribute": "question", - }, - }, - { - "name": "Question Type", - "labels": [ - "Keyword-question", - "Interrogative-question", - "Statement-question", - ], - "bricks": {"group": "question_type", "target_attribute": "question"}, - }, - { - "name": "Question Quality", - "labels": ["Good", "Bad"], - # "bricks": {"group": "question_quality", "target_attribute": "question"}, - }, - { - "name": "Question Complexity", - "labels": ["Low", "Medium", "High"], - "bricks": { - "group": "question_complexity", - "target_attribute": "question", - }, - }, - ], - "attributes": [ - # { - # "bricks": { - # "group": "rephrased_query", - # "target_attribute": "question" - # # "type_lookup": { - # # # defaults to text - # # "euclidean_distance": DataTypes.FLOAT.value, - # # }, - # }, - # "run_code": False, - # }, - { - "name": "search_queries", - "type": DataTypes.EMBEDDING_LIST.value, - "code": REFERENCE_CHUNKS_SENT.replace( - "@@target_attribute@@", "question" - ), - }, - ], - "embeddings": [ - { - "target": { - "attribute": "question", - "platform": "huggingface", - "model": { - "de": "bert-base-german-cased", - "en": "distilbert-base-uncased", - }, - }, - "outlier_slice": False, - } - ], - }, - CognitionProjects.RELEVANCE: { - "labeling_tasks": [ - { - "name": "Fact is relevant", - "labels": [ - "Yes", - "No", - ], - # "bricks": { - # "group": "reference_relevance", - # }, - }, - ], - "attributes": [ - # { - # "bricks": { - # "group": "argumentation_llm", - # "target_attribute": "question" - # # "type_lookup": { - # # # defaults to text - # # "euclidean_distance": DataTypes.FLOAT.value, - # # }, - # }, - # "run_code": False, - # }, - ], - "embeddings": [ - { - "target": { - "attribute": "question", - "platform": "huggingface", - "model": { - "de": "bert-base-german-cased", - "en": "distilbert-base-uncased", - }, - }, - "outlier_slice": False, - } - ], - }, -} - -DEFAULT_MODEL = { - EmbeddingPlatform.AZURE.value: None, - EmbeddingPlatform.COHERE.value: None, - EmbeddingPlatform.HUGGINGFACE.value: "distilbert-base-uncased", - EmbeddingPlatform.OPENAI.value: "text-embedding-ada-002", - EmbeddingPlatform.PYTHON.value: "bag-of-words", -} - -MODEL_DOC2QUERY = { - "de": "doc2query/msmarco-german-mt5-base-v1", - "en": "doc2query/msmarco-t5-base-v1", -} - -FREE_API_REQUEST_URL = "https://k8s.freeapi.kern.ai/inference" From 05504d2f89036cc17f03a17e249cc743c93bb814 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:54:48 +0100 Subject: [PATCH 20/23] Remove constants and bricks_loader --- controller/transfer/cognition/bricks_loader.py | 0 controller/transfer/cognition/constants.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 controller/transfer/cognition/bricks_loader.py delete mode 100644 controller/transfer/cognition/constants.py diff --git a/controller/transfer/cognition/bricks_loader.py b/controller/transfer/cognition/bricks_loader.py deleted file mode 100644 index e69de29b..00000000 diff --git a/controller/transfer/cognition/constants.py b/controller/transfer/cognition/constants.py deleted file mode 100644 index e69de29b..00000000 From 9d650478e8ceb603a25c2bc00e8b32c1ee8adbf2 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 13:57:59 +0100 Subject: [PATCH 21/23] Remove unused import_json --- api/transfer.py | 38 ++------------------------------------ app.py | 2 -- 2 files changed, 2 insertions(+), 38 deletions(-) diff --git a/api/transfer.py b/api/transfer.py index 2f0268f0..a49ca365 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -3,7 +3,7 @@ import time from typing import Optional from starlette.endpoints import HTTPEndpoint -from starlette.responses import PlainTextResponse, JSONResponse +from starlette.responses import PlainTextResponse from controller.embedding.manager import recreate_embeddings from controller.transfer.cognition import ( @@ -19,11 +19,9 @@ from controller.transfer import manager as transfer_manager from controller.upload_task import manager as upload_task_manager -from controller.auth import manager as auth_manager -from controller.project import manager as project_manager from controller.attribute import manager as attribute_manager -from submodules.model import enums, exceptions +from submodules.model import enums from util.notification import create_notification from submodules.model.enums import NotificationType from submodules.model.models import UploadTask @@ -39,38 +37,6 @@ logger = logging.getLogger(__name__) -class JSONImport(HTTPEndpoint): - async def post(self, request) -> JSONResponse: - project_id = request.path_params["project_id"] - request_body = await request.json() - user_id = request_body["user_id"] - auth_manager.check_project_access_from_user_id(user_id, project_id) - - records = request_body["records"] - - project = project_manager.get_project(project_id) - num_project_records = len(project.records) - for att in project.attributes: - if att.is_primary_key: - for idx, record in enumerate(records): - if att.name not in record: - if att.name == "running_id": - records[idx][att.name] = num_project_records + idx + 1 - else: - raise exceptions.InvalidInputException( - f"Non-running-id, primary key {att.name} missing in record" - ) - - transfer_manager.import_records_from_json( - project_id, - user_id, - records, - request_body["request_uuid"], - request_body["is_last"], - ) - return JSONResponse({"success": True}) - - class Notify(HTTPEndpoint): async def post(self, request) -> PlainTextResponse: data = await request.json() diff --git a/app.py b/app.py index 95724433..f152011f 100644 --- a/app.py +++ b/app.py @@ -7,7 +7,6 @@ FullConfigRest, ) from api.transfer import ( - JSONImport, Notify, ) from config_handler import ( @@ -117,7 +116,6 @@ Route("/full_config", FullConfigRest), Route("/notify/{path:path}", Notify), Route("/healthcheck", Healthcheck), - Route("/project/{project_id:str}/import_json", JSONImport), Mount("/api", app=fastapi_app, name="REST API"), Mount( "/internal/api", app=fastapi_app_internal, name="INTERNAL REST API" From e297554f443e3fb7ef32913b2312738c40cfc6a5 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 14:11:50 +0100 Subject: [PATCH 22/23] remove unused util --- controller/transfer/cognition/util.py | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 controller/transfer/cognition/util.py diff --git a/controller/transfer/cognition/util.py b/controller/transfer/cognition/util.py deleted file mode 100644 index 62f9810d..00000000 --- a/controller/transfer/cognition/util.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Optional - -from util import notification - - -def send_log_message( - sender_project_id: str, - message: str, - is_error: bool = False, - organization_id: Optional[str] = None, -) -> None: - state = "ERROR" if is_error else "INFO" - notification.send_organization_update( - sender_project_id, - f"cognition_wizard:LOG_MESSAGE:{state}:{message}", - organization_id=organization_id, - ) From fd6d8e0e48ac64227032065c2a4558e8c13ae6f8 Mon Sep 17 00:00:00 2001 From: anmarhindi Date: Thu, 12 Dec 2024 14:18:44 +0100 Subject: [PATCH 23/23] Additional removal --- .../transfer/association_transfer_manager.py | 103 ------------------ 1 file changed, 103 deletions(-) delete mode 100644 controller/transfer/association_transfer_manager.py diff --git a/controller/transfer/association_transfer_manager.py b/controller/transfer/association_transfer_manager.py deleted file mode 100644 index 631b7156..00000000 --- a/controller/transfer/association_transfer_manager.py +++ /dev/null @@ -1,103 +0,0 @@ -import traceback -from typing import Dict, Any, List -from controller.labeling_task import manager as labeling_task_manager -from controller.attribute import manager as attribute_manager -from controller.record import manager as record_manager -from util import notification - - -from controller.information_source import manager as information_source_manager -from submodules.model import enums -from submodules.model.business_objects import ( - general, - labeling_task_label, - record_label_association, -) -from submodules.model.models import RecordLabelAssociation -from controller.weak_supervision import weak_supervision_service as weak_supervision - - -def import_associations( - project_id: str, - user_id: str, - model_name: str, - labeling_task_name: str, - associations: Dict[str, Any], - indices: List[Dict[str, Any]], - source_type: str, -) -> int: - labeling_task = labeling_task_manager.get_labeling_task_by_name( - project_id, labeling_task_name - ) - - information_source = information_source_manager.get_information_source_by_name( - project_id, model_name - ) - - if information_source is None: - - if source_type == "heuristic": - description = "This is a heuristic" - type = enums.LabelSource.INFORMATION_SOURCE.value - else: - raise Exception("Unknown source type") - - information_source = information_source_manager.create_information_source( - project_id, - user_id, - labeling_task.id, - model_name, - "", - description, - type, - ) - notification.send_organization_update( - project_id, f"information_source_created:{str(information_source.id)}" - ) - - attribute_names = list(indices[0].keys()) - attribute_list = attribute_manager.get_all_attributes_by_names( - project_id, attribute_names - ) - - records = record_manager.get_records_by_composite_keys( - project_id, - indices, - attribute_list, - enums.RecordCategory.SCALE.value, # we currently don't use TEST any more - ) - record_ids = [record.id for record in list(records.values())] - - record_label_associations = [] - - label_options = labeling_task_label.get_all_by_task_id(project_id, labeling_task.id) - label_id_by_name_dict = {label.name: label.id for label in label_options} - for record_id, association in zip(record_ids, associations): - label_name, confidence = association - record_label_associations.append( - RecordLabelAssociation( - project_id=project_id, - record_id=record_id, - labeling_task_label_id=label_id_by_name_dict[label_name], - source_type=enums.LabelSource.MODEL_CALLBACK.value, - source_id=information_source.id, - return_type=enums.InformationSourceReturnType.RETURN.value, - confidence=confidence, - created_by=user_id, - ) - ) - - record_label_association.delete_by_source_id_and_record_ids( - project_id, information_source.id, record_ids - ) - general.add_all(record_label_associations, with_commit=True) - general.commit() - - try: - weak_supervision.calculate_stats_after_source_run_with_debounce( - project_id, information_source.id, user_id - ) - except: - print(traceback.format_exc()) - - return len(record_label_associations)