Skip to content

Commit e376553

Browse files
committed
Add checkpointing to orchestration workflow
Introduces InMemoryCheckpointStorage to enable checkpointing in the Magentic workflow. Cleans up outdated comments referencing previous orchestration setups and updates workflow execution to include thread_id for better tracking.
1 parent 2a059c4 commit e376553

File tree

1 file changed

+5
-9
lines changed

1 file changed

+5
-9
lines changed

src/backend/v4/orchestration/orchestration_manager.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
ChatMessage,
1212
WorkflowOutputEvent,
1313
MagenticBuilder,
14-
# MagenticCallbackMode,
14+
InMemoryCheckpointStorage,
1515
MagenticOrchestratorMessageEvent,
1616
MagenticAgentDeltaEvent,
1717
MagenticAgentMessageEvent,
@@ -52,8 +52,6 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
5252
- HumanApprovalMagenticManager as orchestrator manager
5353
- AzureAIAgentClient as the underlying chat client
5454
- Event-based callbacks for streaming and final responses
55-
56-
This mirrors the old Semantic Kernel orchestration setup:
5755
- Uses same deployment, endpoint, and credentials
5856
- Applies same execution settings (temperature, max_tokens)
5957
- Maintains same human approval workflow
@@ -122,7 +120,8 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
122120
participants[name] = ag
123121
cls.logger.debug("Added participant '%s'", name)
124122

125-
# Assemble workflow with callback (proper way for agent_framework)
123+
# Assemble workflow with callback
124+
storage = InMemoryCheckpointStorage()
126125
builder = (
127126
MagenticBuilder()
128127
.participants(**participants)
@@ -131,6 +130,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
131130
max_round_count=orchestration_config.max_rounds,
132131
max_stall_count=0,
133132
)
133+
.with_checkpointing(storage)
134134
)
135135

136136
# Build workflow
@@ -206,11 +206,6 @@ async def get_current_or_new_orchestration(
206206
async def run_orchestration(self, user_id: str, input_task) -> None:
207207
"""
208208
Execute the Magentic workflow for the provided user and task description.
209-
210-
This mirrors the old SK orchestration:
211-
- Uses same execution settings (temperature=0.1, max_tokens=4000)
212-
- Maintains same approval workflow
213-
- Sends same WebSocket updates
214209
"""
215210
job_id = str(uuid.uuid4())
216211
orchestration_config.set_approval_pending(job_id)
@@ -271,6 +266,7 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
271266
final_output: str | None = None
272267

273268
self.logger.info("Starting workflow execution...")
269+
thread_id=f"task-{job_id}"
274270
async for event in workflow.run_stream(task_text):
275271
try:
276272
# Handle orchestrator messages (task assignments, coordination)

0 commit comments

Comments
 (0)