Skip to content

Task Master #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0f13b65
clean task queue
LennartSchmidtKern Aug 4, 2024
711af52
remove task endpoints
LennartSchmidtKern Aug 5, 2024
f7a7ca9
alembic
LennartSchmidtKern Aug 5, 2024
2ffd90b
add itnernal endpoint dummy
LennartSchmidtKern Aug 5, 2024
bc96da4
offer task executions
LennartSchmidtKern Aug 5, 2024
7ab0a01
information source endpoint
LennartSchmidtKern Aug 6, 2024
d2ffe08
return payload id
LennartSchmidtKern Aug 6, 2024
4a91bdd
information source, project id handling
LennartSchmidtKern Aug 6, 2024
abc0fbb
import wizard org ids
LennartSchmidtKern Aug 6, 2024
ca73f3e
project id tokenizer
LennartSchmidtKern Aug 6, 2024
2ec9828
attribute calculation, embedding
LennartSchmidtKern Aug 6, 2024
9097b80
embedding
LennartSchmidtKern Aug 6, 2024
8d81ae0
data slice
LennartSchmidtKern Aug 6, 2024
c8c686f
gate removal
LennartSchmidtKern Aug 6, 2024
ae9467f
remove start gates
LennartSchmidtKern Aug 7, 2024
255e2d6
task executions, wizard improvements
LennartSchmidtKern Aug 7, 2024
bd34f95
improve information source
LennartSchmidtKern Aug 7, 2024
25d6a81
clean
LennartSchmidtKern Aug 7, 2024
b7c541d
project id
LennartSchmidtKern Aug 7, 2024
8590e19
merge + model
LennartSchmidtKern Aug 8, 2024
0a08a62
model
LennartSchmidtKern Aug 8, 2024
cdfb760
task deletion
LennartSchmidtKern Aug 8, 2024
9f0c187
replace alembic + model
LennartSchmidtKern Aug 8, 2024
152b05a
PR updates
LennartSchmidtKern Aug 12, 2024
ed0d0cc
blank lines
LennartSchmidtKern Aug 12, 2024
af28a13
clean
LennartSchmidtKern Aug 12, 2024
25edf7e
Merge branch 'dev' into task-master
LennartSchmidtKern Aug 12, 2024
99b2030
model merge
LennartSchmidtKern Aug 12, 2024
573829d
test
LennartSchmidtKern Aug 12, 2024
1e8eef4
model dev
LennartSchmidtKern Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions alembic/versions/af5c30d91685_remove_project_id_task_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""remove project id task queue

Revision ID: af5c30d91685
Revises: 0d587af700ce
Create Date: 2024-08-04 15:42:26.747671

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'af5c30d91685'
down_revision = '0d587af700ce'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('macro_execution_summary',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('creation_month', sa.Date(), nullable=True),
sa.Column('macro_type', sa.String(), nullable=True),
sa.Column('execution_count', sa.Integer(), nullable=True),
sa.Column('processed_files_count', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('organization_id', 'creation_month', 'macro_type', name='unique_macro_summary'),
schema='cognition'
)
op.create_index(op.f('ix_cognition_macro_execution_summary_creation_month'), 'macro_execution_summary', ['creation_month'], unique=False, schema='cognition')
op.create_index(op.f('ix_cognition_macro_execution_summary_organization_id'), 'macro_execution_summary', ['organization_id'], unique=False, schema='cognition')
op.add_column('task_queue', sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True))
op.drop_constraint('task_queue_project_id_fkey', 'task_queue', type_='foreignkey')
op.create_foreign_key(None, 'task_queue', 'organization', ['organization_id'], ['id'], ondelete='CASCADE')
op.drop_column('task_queue', 'project_id')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('task_queue', sa.Column('project_id', postgresql.UUID(), autoincrement=False, nullable=True))
op.drop_constraint(None, 'task_queue', type_='foreignkey')
op.create_foreign_key('task_queue_project_id_fkey', 'task_queue', 'project', ['project_id'], ['id'], ondelete='CASCADE')
op.drop_column('task_queue', 'organization_id')
op.drop_index(op.f('ix_cognition_macro_execution_summary_organization_id'), table_name='macro_execution_summary', schema='cognition')
op.drop_index(op.f('ix_cognition_macro_execution_summary_creation_month'), table_name='macro_execution_summary', schema='cognition')
op.drop_table('macro_execution_summary', schema='cognition')
# ### end Alembic commands ###
15 changes: 10 additions & 5 deletions api/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from submodules.model import events
from util import doc_ock, notification, adapter

from controller.task_queue import manager as task_queue_manager
from controller.task_master import manager as task_master_manager
from submodules.model.enums import TaskType, RecordTokenizationScope

logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -71,17 +71,22 @@ async def post(self, request_body) -> JSONResponse:
adapter.check(data, project.id, user.id)

project_manager.add_workflow_store_data_to_project(
user_id=user.id, project_id=project.id, file_name=name, data=data
user_id=user.id,
project_id=project.id,
org_id=project.organization_id,
file_name=name,
data=data,
)

task_queue_manager.add_task(
str(project.id),
TaskType.TOKENIZATION,
task_master_manager.queue_task(
str(organization.id),
str(user.id),
TaskType.TOKENIZATION,
{
"scope": RecordTokenizationScope.PROJECT.value,
"include_rats": True,
"only_uploaded_attributes": False,
"project_id": str(project.id),
},
)

Expand Down
96 changes: 9 additions & 87 deletions api/transfer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import traceback
import time
from typing import Optional, Dict
from typing import Optional
from starlette.endpoints import HTTPEndpoint
from starlette.responses import PlainTextResponse, JSONResponse
from controller.embedding.manager import recreate_embeddings
Expand All @@ -18,12 +18,11 @@
general,
organization,
tokenization,
project as refinery_project,
project,
)

from submodules.model.cognition_objects import (
project as cognition_project,
macro as macro_db_bo,
)

from controller.transfer import manager as transfer_manager
Expand All @@ -41,7 +40,7 @@
from util import daemon, notification
from controller.transfer.cognition.minio_upload import handle_cognition_file_upload

from controller.task_queue import manager as task_queue_manager
from controller.task_master import manager as task_master_manager
from submodules.model.enums import TaskType, RecordTokenizationScope


Expand Down Expand Up @@ -243,86 +242,6 @@ def put(self, request) -> PlainTextResponse:
return PlainTextResponse("OK")


class CognitionParseMarkdownFile(HTTPEndpoint):
def post(self, request) -> PlainTextResponse:
refinery_project_id = request.path_params["project_id"]
refinery_project_item = refinery_project.get(refinery_project_id)
if not refinery_project_item:
return PlainTextResponse("Bad project id", status_code=400)

dataset_id = request.path_params["dataset_id"]
file_id = request.path_params["file_id"]

# via thread to ensure the endpoint returns immediately

daemon.run(
CognitionParseMarkdownFile.__add_parse_markdown_file_thread,
refinery_project_id,
str(refinery_project_item.created_by),
{
"org_id": str(refinery_project_item.organization_id),
"dataset_id": dataset_id,
"file_id": file_id,
},
)

return PlainTextResponse("OK")

def __add_parse_markdown_file_thread(
project_id: str, user_id: str, task_info: Dict[str, str]
):

ctx_token = general.get_ctx_token()
try:
task_queue_manager.add_task(
project_id, TaskType.PARSE_MARKDOWN_FILE, user_id, task_info
)
finally:
general.remove_and_refresh_session(ctx_token, False)


class CognitionStartMacroExecutionGroup(HTTPEndpoint):
def put(self, request) -> PlainTextResponse:
macro_id = request.path_params["macro_id"]
group_id = request.path_params["group_id"]

execution_entries = macro_db_bo.get_all_macro_executions(macro_id, group_id)

if len(execution_entries) == 0:
return PlainTextResponse("No executions found", status_code=400)
if not (cognition_prj_id := execution_entries[0].meta_info.get("project_id")):
return PlainTextResponse("No project id found", status_code=400)
cognition_prj = cognition_project.get(cognition_prj_id)
refinery_prj_id = str(
refinery_project.get_or_create_queue_project(
cognition_prj.organization_id, cognition_prj.created_by, True
).id
)
cached = {str(e.id): str(e.created_by) for e in execution_entries}

def queue_tasks():
token = general.get_ctx_token()
try:
for exec_id in cached:
task_queue_manager.add_task(
refinery_prj_id,
TaskType.RUN_COGNITION_MACRO,
cached[exec_id],
{
"macro_id": macro_id,
"execution_id": exec_id,
"execution_group_id": group_id,
},
)
general.commit()
finally:
general.remove_and_refresh_session(token, False)

daemon.run(queue_tasks)

return PlainTextResponse("OK")


class AssociationsImport(HTTPEndpoint):
async def post(self, request) -> JSONResponse:
project_id = request.path_params["project_id"]
Expand Down Expand Up @@ -404,11 +323,14 @@ def init_file_import(task: UploadTask, project_id: str, is_global_update: bool)
)
if task.file_type != "knowledge_base":
only_usable_attributes = task.file_type == "records_add"
task_queue_manager.add_task(
project_id,
project_item = project.get(project_id)
org_id = str(project_item.organization_id)
task_master_manager.queue_task(
str(org_id),
str(task.user_id),
TaskType.TOKENIZATION,
task.user_id,
{
"project_id": str(project_id),
"scope": RecordTokenizationScope.PROJECT.value,
"include_rats": True,
"only_uploaded_attributes": only_usable_attributes,
Expand Down
21 changes: 9 additions & 12 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
UploadTaskInfo,
CognitionImport,
CognitionPrepareProject,
CognitionParseMarkdownFile,
CognitionStartMacroExecutionGroup,
)
from fast_api.routes.organization import router as org_router
from fast_api.routes.project import router as project_router
Expand All @@ -36,12 +34,12 @@
from fast_api.routes.record import router as record_router
from fast_api.routes.weak_supervision import router as weak_supervision_router
from fast_api.routes.labeling_tasks import router as labeling_tasks_router
from fast_api.routes.task_execution import router as task_execution_router
from middleware.database_session import handle_db_session
from middleware.starlette_tmp_middleware import DatabaseSessionHandler
from starlette.applications import Starlette
from starlette.routing import Route, Mount

from controller.task_queue.task_queue import init_task_queues
from controller.project.manager import check_in_deletion_projects
from route_prefix import (
PREFIX_ORGANIZATION,
Expand All @@ -62,6 +60,7 @@
PREFIX_RECORD,
PREFIX_WEAK_SUPERVISION,
PREFIX_LABELING_TASKS,
PREFIX_TASK_EXECUTION,
)
from util import security, clean_up
from middleware import log_storage
Expand Down Expand Up @@ -116,6 +115,10 @@
labeling_tasks_router, prefix=PREFIX_LABELING_TASKS, tags=["labeling-tasks"]
)

fastapi_app_internal = FastAPI()
fastapi_app_internal.include_router(
task_execution_router, prefix=PREFIX_TASK_EXECUTION, tags=["task-execution"]
)
routes = [
Route("/notify/{path:path}", Notify),
Route("/healthcheck", Healthcheck),
Expand All @@ -135,19 +138,14 @@
"/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize",
CognitionPrepareProject,
),
Route(
"/project/{project_id:str}/cognition/datasets/{dataset_id:str}/files/{file_id:str}/queue",
CognitionParseMarkdownFile,
),
Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo),
Route("/project", ProjectCreationFromWorkflow),
Route(
"/macro/{macro_id:str}/execution-group/{group_id:str}/queue",
CognitionStartMacroExecutionGroup,
),
Route("/is_managed", IsManagedRest),
Route("/is_demo", IsDemoRest),
Mount("/api", app=fastapi_app, name="REST API"),
Mount(
"/internal/api", app=fastapi_app_internal, name="INTERNAL REST API"
), # task master requesting
]


Expand All @@ -156,7 +154,6 @@
middleware = [Middleware(DatabaseSessionHandler)]
app = Starlette(routes=routes, middleware=middleware)

init_task_queues()
check_in_deletion_projects()
security.check_secret_key()
clean_up.clean_up_database()
Expand Down
Loading
Loading