11"""CosmosDB implementation of the database interface."""
22
3- import json
4- import logging
5- import uuid
6-
73import datetime
4+ import logging
85from typing import Any , Dict , List , Optional , Type
96
10- from azure . cosmos import PartitionKey , exceptions
7+ import v3 . models . messages as messages
118from azure .cosmos .aio import CosmosClient
129from azure .cosmos .aio ._database import DatabaseProxy
13- from azure .cosmos .exceptions import CosmosResourceExistsError
14- import v3 .models .messages as messages
1510
16- from common .models .messages_kernel import (
17- AgentMessage ,
18- Plan ,
19- Step ,
20- TeamConfiguration ,
21- )
22- from common .utils .utils_date import DateTimeEncoder
23-
24- from .database_base import DatabaseBase
2511from ..models .messages_kernel import (
12+ AgentMessage ,
2613 AgentMessageData ,
2714 BaseDataModel ,
15+ DataType ,
2816 Plan ,
2917 Step ,
30- AgentMessage ,
3118 TeamConfiguration ,
32- DataType ,
3319 UserCurrentTeam ,
3420)
21+ from .database_base import DatabaseBase
3522
3623
3724class CosmosDBClient (DatabaseBase ):
@@ -189,7 +176,6 @@ async def delete_item(self, item_id: str, partition_key: str) -> None:
189176 self .logger .error ("Failed to delete item from CosmosDB: %s" , str (e ))
190177 raise
191178
192-
193179 # Plan Operations
194180 async def add_plan (self , plan : Plan ) -> None :
195181 """Add a plan to CosmosDB."""
@@ -199,7 +185,6 @@ async def update_plan(self, plan: Plan) -> None:
199185 """Update a plan in CosmosDB."""
200186 await self .update_item (plan )
201187
202-
203188 async def get_plan_by_plan_id (self , plan_id : str ) -> Optional [Plan ]:
204189 """Retrieve a plan by plan_id."""
205190 query = "SELECT * FROM c WHERE c.id=@plan_id AND c.data_type=@data_type"
@@ -234,8 +219,9 @@ async def get_all_plans_by_team_id(self, team_id: str) -> List[Plan]:
234219 ]
235220 return await self .query_items (query , parameters , Plan )
236221
237-
238- async def get_all_plans_by_team_id_status (self , user_id : str ,team_id : str , status : str ) -> List [Plan ]:
222+ async def get_all_plans_by_team_id_status (
223+ self , user_id : str , team_id : str , status : str
224+ ) -> List [Plan ]:
239225 """Retrieve all plans for a specific team."""
240226 query = "SELECT * FROM c WHERE c.team_id=@team_id AND c.data_type=@data_type and c.user_id=@user_id and c.overall_status=@status ORDER BY c._ts DESC"
241227 parameters = [
@@ -245,6 +231,7 @@ async def get_all_plans_by_team_id_status(self, user_id: str,team_id: str, statu
245231 {"name" : "@status" , "value" : status },
246232 ]
247233 return await self .query_items (query , parameters , Plan )
234+
248235 # Step Operations
249236 async def add_step (self , step : Step ) -> None :
250237 """Add a step to CosmosDB."""
@@ -414,8 +401,6 @@ async def get_current_team(self, user_id: str) -> Optional[UserCurrentTeam]:
414401 teams = await self .query_items (query , parameters , UserCurrentTeam )
415402 return teams [0 ] if teams else None
416403
417-
418-
419404 async def delete_current_team (self , user_id : str ) -> bool :
420405 """Delete the current team for a user."""
421406 query = "SELECT c.id, c.session_id FROM c WHERE c.user_id=@user_id AND c.data_type=@data_type"
@@ -429,9 +414,13 @@ async def delete_current_team(self, user_id: str) -> bool:
429414 if items :
430415 async for doc in items :
431416 try :
432- await self .container .delete_item (doc ["id" ], partition_key = doc ["session_id" ])
417+ await self .container .delete_item (
418+ doc ["id" ], partition_key = doc ["session_id" ]
419+ )
433420 except Exception as e :
434- self .logger .warning ("Failed deleting current team doc %s: %s" , doc .get ("id" ), e )
421+ self .logger .warning (
422+ "Failed deleting current team doc %s: %s" , doc .get ("id" ), e
423+ )
435424
436425 return True
437426
@@ -457,9 +446,13 @@ async def delete_plan_by_plan_id(self, plan_id: str) -> bool:
457446 if items :
458447 async for doc in items :
459448 try :
460- await self .container .delete_item (doc ["id" ], partition_key = doc ["session_id" ])
449+ await self .container .delete_item (
450+ doc ["id" ], partition_key = doc ["session_id" ]
451+ )
461452 except Exception as e :
462- self .logger .warning ("Failed deleting current team doc %s: %s" , doc .get ("id" ), e )
453+ self .logger .warning (
454+ "Failed deleting current team doc %s: %s" , doc .get ("id" ), e
455+ )
463456
464457 return True
465458
@@ -471,7 +464,6 @@ async def update_mplan(self, mplan: messages.MPlan) -> None:
471464 """Update a team configuration in the database."""
472465 await self .update_item (mplan )
473466
474-
475467 async def get_mplan (self , plan_id : str ) -> Optional [messages .MPlan ]:
476468 """Retrieve a mplan configuration by mplan_id."""
477469 query = "SELECT * FROM c WHERE c.plan_id=@plan_id AND c.data_type=@data_type"
@@ -481,7 +473,6 @@ async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]:
481473 ]
482474 results = await self .query_items (query , parameters , messages .MPlan )
483475 return results [0 ] if results else None
484-
485476
486477 async def add_agent_message (self , message : AgentMessageData ) -> None :
487478 """Add an agent message to the database."""
@@ -499,4 +490,4 @@ async def get_agent_messages(self, plan_id: str) -> List[AgentMessageData]:
499490 {"name" : "@data_type" , "value" : DataType .m_plan_message },
500491 ]
501492
502- return await self .query_items (query , parameters , AgentMessageData )
493+ return await self .query_items (query , parameters , AgentMessageData )
0 commit comments