diff --git a/src/backend/v4/magentic_agents/proxy_agent.py b/src/backend/v4/magentic_agents/proxy_agent.py index 04f3cb6c..535c6310 100644 --- a/src/backend/v4/magentic_agents/proxy_agent.py +++ b/src/backend/v4/magentic_agents/proxy_agent.py @@ -81,7 +81,6 @@ def get_new_thread(self, **kwargs: Any) -> AgentThread: A new AgentThread instance """ return AgentThread(**kwargs) - async def run( self, messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, @@ -161,7 +160,7 @@ async def _invoke_stream_internal( ) logger.debug("ProxyAgent: Message text: %s", message_text[:100]) - clarification_req_text = f"I need clarification about: {message_text}" + clarification_req_text = f"{message_text}" clarification_request = UserClarificationRequest( question=clarification_req_text, request_id=str(uuid.uuid4()), diff --git a/src/backend/v4/orchestration/orchestration_manager.py b/src/backend/v4/orchestration/orchestration_manager.py index d7a8829b..24aadd4e 100644 --- a/src/backend/v4/orchestration/orchestration_manager.py +++ b/src/backend/v4/orchestration/orchestration_manager.py @@ -11,7 +11,7 @@ ChatMessage, WorkflowOutputEvent, MagenticBuilder, - # MagenticCallbackMode, + # MagenticCallbackMode, MagenticOrchestratorMessageEvent, MagenticAgentDeltaEvent, MagenticAgentMessageEvent, @@ -52,7 +52,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): - HumanApprovalMagenticManager as orchestrator manager - AzureAIAgentClient as the underlying chat client - Event-based callbacks for streaming and final responses - + This mirrors the old Semantic Kernel orchestration setup: - Uses same deployment, endpoint, and credentials - Applies same execution settings (temperature, max_tokens) @@ -71,21 +71,22 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): project_endpoint=config.AZURE_AI_PROJECT_ENDPOINT, model_deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME, async_credential=credential, + ) - + cls.logger.info( "Created AzureAIAgentClient for orchestration with model '%s' at endpoint '%s'", config.AZURE_OPENAI_DEPLOYMENT_NAME, - config.AZURE_AI_PROJECT_ENDPOINT + config.AZURE_AI_PROJECT_ENDPOINT, ) except Exception as e: cls.logger.error("Failed to create AzureAIAgentClient: %s", e) raise # Create HumanApprovalMagenticManager with the chat client - # Execution settings (temperature=0.1, max_tokens=4000) are configured via + # Execution settings (temperature=0.1, max_tokens=4000) are configured via # orchestration_config.create_execution_settings() which matches old SK version - try: + try: manager = HumanApprovalMagenticManager( user_id=user_id, chat_client=chat_client, @@ -95,7 +96,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): cls.logger.info( "Created HumanApprovalMagenticManager for user '%s' with max_rounds=%d", user_id, - orchestration_config.max_rounds + orchestration_config.max_rounds, ) except Exception as e: cls.logger.error("Failed to create manager: %s", e) @@ -107,11 +108,11 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): name = getattr(ag, "agent_name", None) or getattr(ag, "name", None) if not name: name = f"agent_{len(participants)+1}" - + # Extract the inner ChatAgent for wrapper templates # FoundryAgentTemplate and ReasoningAgentTemplate wrap a ChatAgent in self._agent # ProxyAgent directly extends BaseAgent and can be used as-is - if hasattr(ag, '_agent') and ag._agent is not None: + if hasattr(ag, "_agent") and ag._agent is not None: # This is a wrapper (FoundryAgentTemplate or ReasoningAgentTemplate) # Use the inner ChatAgent which implements AgentProtocol participants[name] = ag._agent @@ -121,16 +122,23 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): participants[name] = ag cls.logger.debug("Added participant '%s'", name) - # Assemble workflow with .on_event() callback (proper way for agent_framework) + # Assemble workflow with callback (proper way for agent_framework) builder = ( MagenticBuilder() .participants(**participants) - .with_standard_manager(manager=manager) + .with_standard_manager( + manager=manager, + max_round_count=orchestration_config.max_rounds, + max_stall_count=0, + ) ) # Build workflow workflow = builder.build() - cls.logger.info("Built Magentic workflow with %d participants and event callbacks", len(participants)) + cls.logger.info( + "Built Magentic workflow with %d participants and event callbacks", + len(participants), + ) return workflow @@ -149,10 +157,14 @@ async def get_current_or_new_orchestration( current = orchestration_config.get_current_orchestration(user_id) if current is None or team_switched: if current is not None and team_switched: - cls.logger.info("Team switched, closing previous agents for user '%s'", user_id) + cls.logger.info( + "Team switched, closing previous agents for user '%s'", user_id + ) # Close prior agents (same logic as old version) for agent in getattr(current, "_participants", {}).values(): - agent_name = getattr(agent, "agent_name", getattr(agent, "name", "")) + agent_name = getattr( + agent, "agent_name", getattr(agent, "name", "") + ) if agent_name != "ProxyAgent": close_coro = getattr(agent, "close", None) if callable(close_coro): @@ -161,6 +173,7 @@ async def get_current_or_new_orchestration( cls.logger.debug("Closed agent '%s'", agent_name) except Exception as e: cls.logger.error("Error closing agent: %s", e) + factory = MagenticAgentFactory() try: @@ -169,16 +182,20 @@ async def get_current_or_new_orchestration( ) cls.logger.info("Created %d agents for user '%s'", len(agents), user_id) except Exception as e: - cls.logger.error("Failed to create agents for user '%s': %s", user_id, e) + cls.logger.error( + "Failed to create agents for user '%s': %s", user_id, e + ) print(f"Failed to create agents for user '{user_id}': {e}") raise try: cls.logger.info("Initializing new orchestration for user '%s'", user_id) - orchestration_config.orchestrations[user_id] = await cls.init_orchestration( - agents, user_id + orchestration_config.orchestrations[user_id] = ( + await cls.init_orchestration(agents, user_id) ) except Exception as e: - cls.logger.error("Failed to initialize orchestration for user '%s': %s", user_id, e) + cls.logger.error( + "Failed to initialize orchestration for user '%s': %s", user_id, e + ) print(f"Failed to initialize orchestration for user '{user_id}': {e}") raise return orchestration_config.get_current_orchestration(user_id) @@ -189,7 +206,7 @@ async def get_current_or_new_orchestration( async def run_orchestration(self, user_id: str, input_task) -> None: """ Execute the Magentic workflow for the provided user and task description. - + This mirrors the old SK orchestration: - Uses same execution settings (temperature=0.1, max_tokens=4000) - Maintains same approval workflow @@ -197,20 +214,52 @@ async def run_orchestration(self, user_id: str, input_task) -> None: """ job_id = str(uuid.uuid4()) orchestration_config.set_approval_pending(job_id) - self.logger.info("Starting orchestration job '%s' for user '%s'", job_id, user_id) + self.logger.info( + "Starting orchestration job '%s' for user '%s'", job_id, user_id + ) workflow = orchestration_config.get_current_orchestration(user_id) if workflow is None: raise ValueError("Orchestration not initialized for user.") + # Fresh thread per participant to avoid cross-run state bleed + executors = getattr(workflow, "executors", {}) + self.logger.debug("Executor keys at run start: %s", list(executors.keys())) - # Ensure manager tracks user_id (same as old version) - try: - manager = getattr(workflow, "_manager", None) - if manager and hasattr(manager, "current_user_id"): - manager.current_user_id = user_id - self.logger.debug("Set user_id on manager = %s", user_id) - except Exception as e: - self.logger.error("Error setting user_id on manager: %s", e) + for exec_key, executor in executors.items(): + try: + if exec_key == "magentic_orchestrator": + # Orchestrator path + if hasattr(executor, "_conversation"): + conv = getattr(executor, "_conversation") + # Support list-like or custom container with clear() + if hasattr(conv, "clear") and callable(conv.clear): + conv.clear() + self.logger.debug("Cleared orchestrator conversation (%s)", exec_key) + elif isinstance(conv, list): + conv[:] = [] + self.logger.debug("Emptied orchestrator conversation list (%s)", exec_key) + else: + self.logger.debug("Orchestrator conversation not clearable type (%s): %s", exec_key, type(conv)) + else: + self.logger.debug("Orchestrator has no _conversation attribute (%s)", exec_key) + else: + # Agent path + if hasattr(executor, "_chat_history"): + hist = getattr(executor, "_chat_history") + if hasattr(hist, "clear") and callable(hist.clear): + hist.clear() + self.logger.debug("Cleared agent chat history (%s)", exec_key) + elif isinstance(hist, list): + hist[:] = [] + self.logger.debug("Emptied agent chat history list (%s)", exec_key) + else: + self.logger.debug("Agent chat history not clearable type (%s): %s", exec_key, type(hist)) + else: + self.logger.debug("Agent executor has no _chat_history attribute (%s)", exec_key) + except Exception as e: + self.logger.warning("Failed clearing state for executor %s: %s", exec_key, e) + # --- END NEW BLOCK --- + # Build task from input (same as old version) task_text = getattr(input_task, "description", str(input_task)) @@ -220,62 +269,78 @@ async def run_orchestration(self, user_id: str, input_task) -> None: # Execute workflow using run_stream with task as positional parameter # The execution settings are configured in the manager/client final_output: str | None = None - + self.logger.info("Starting workflow execution...") async for event in workflow.run_stream(task_text): try: # Handle orchestrator messages (task assignments, coordination) if isinstance(event, MagenticOrchestratorMessageEvent): - message_text = getattr(event.message, 'text', '') + message_text = getattr(event.message, "text", "") self.logger.info(f"[ORCHESTRATOR:{event.kind}] {message_text}") - + # Handle streaming updates from agents elif isinstance(event, MagenticAgentDeltaEvent): try: await streaming_agent_response_callback( - event.agent_id, + event.agent_id, event, # Pass the event itself as the update object False, # Not final yet (streaming in progress) - user_id + user_id, ) except Exception as e: - self.logger.error(f"Error in streaming callback for agent {event.agent_id}: {e}") - + self.logger.error( + f"Error in streaming callback for agent {event.agent_id}: {e}" + ) + # Handle final agent messages (complete response) elif isinstance(event, MagenticAgentMessageEvent): if event.message: try: - agent_response_callback(event.agent_id, event.message, user_id) + agent_response_callback( + event.agent_id, event.message, user_id + ) except Exception as e: - self.logger.error(f"Error in agent callback for agent {event.agent_id}: {e}") - + self.logger.error( + f"Error in agent callback for agent {event.agent_id}: {e}" + ) + # Handle final result from the entire workflow elif isinstance(event, MagenticFinalResultEvent): - final_text = getattr(event.message, 'text', '') - self.logger.info(f"[FINAL RESULT] Length: {len(final_text)} chars") - + final_text = getattr(event.message, "text", "") + self.logger.info( + f"[FINAL RESULT] Length: {len(final_text)} chars" + ) + # Handle workflow output event (captures final result) elif isinstance(event, WorkflowOutputEvent): output_data = event.data if isinstance(output_data, ChatMessage): - final_output = getattr(output_data, "text", None) or str(output_data) + final_output = getattr(output_data, "text", None) or str( + output_data + ) else: final_output = str(output_data) self.logger.debug("Received workflow output event") - + except Exception as e: - self.logger.error(f"Error processing event {type(event).__name__}: {e}", exc_info=True) + self.logger.error( + f"Error processing event {type(event).__name__}: {e}", + exc_info=True, + ) # Extract final result final_text = final_output if final_output else "" - - # Log results (same format as old version) + + # Log results self.logger.info("\nAgent responses:") - self.logger.info("Orchestration completed. Final result length: %d chars", len(final_text)) + self.logger.info( + "Orchestration completed. Final result length: %d chars", + len(final_text), + ) self.logger.info("\nFinal result:\n%s", final_text) self.logger.info("=" * 50) - # Send final result via WebSocket (same as old version) + # Send final result via WebSocket await connection_config.send_status_update_async( { "type": WebsocketMessageType.FINAL_RESULT_MESSAGE, @@ -289,15 +354,15 @@ async def run_orchestration(self, user_id: str, input_task) -> None: message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE, ) self.logger.info("Final result sent via WebSocket to user '%s'", user_id) - + except Exception as e: - # Error handling (enhanced from old version) + # Error handling self.logger.error("Unexpected orchestration error: %s", e, exc_info=True) self.logger.error("Error type: %s", type(e).__name__) if hasattr(e, "__dict__"): self.logger.error("Error attributes: %s", e.__dict__) self.logger.info("=" * 50) - + # Send error status to user try: await connection_config.send_status_update_async( @@ -314,4 +379,4 @@ async def run_orchestration(self, user_id: str, input_task) -> None: ) except Exception as send_error: self.logger.error("Failed to send error status: %s", send_error) - raise \ No newline at end of file + raise diff --git a/src/frontend/src/pages/PlanPage.tsx b/src/frontend/src/pages/PlanPage.tsx index bef20846..1c3743ae 100644 --- a/src/frontend/src/pages/PlanPage.tsx +++ b/src/frontend/src/pages/PlanPage.tsx @@ -106,6 +106,7 @@ const PlanPage: React.FC = () => { if (pendingNavigation) { pendingNavigation(); } + webSocketService.disconnect(); } catch (error) { console.error('❌ Failed to cancel plan:', error); showToast('Failed to cancel the plan properly, but navigation will continue.', 'error'); @@ -354,6 +355,7 @@ const PlanPage: React.FC = () => { // Wait for the agent message to be processed and persisted // The processAgentMessage function will handle refreshing the task list + webSocketService.disconnect(); processAgentMessage(agentMessageData, planData, is_final, streamingMessageBuffer); } diff --git a/src/frontend/src/services/WebSocketService.tsx b/src/frontend/src/services/WebSocketService.tsx index 0f9c72a5..e8e7b396 100644 --- a/src/frontend/src/services/WebSocketService.tsx +++ b/src/frontend/src/services/WebSocketService.tsx @@ -91,6 +91,7 @@ class WebSocketService { } disconnect(): void { + console.log('WebSocketService: Disconnecting WebSocket'); if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; @@ -104,21 +105,6 @@ class WebSocketService { this.isConnecting = false; } - subscribeToPlan(planId: string): void { - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - const message = { type: 'subscribe_plan', plan_id: planId }; - this.ws.send(JSON.stringify(message)); - this.planSubscriptions.add(planId); - } - } - - unsubscribeFromPlan(planId: string): void { - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - const message = { type: 'unsubscribe_plan', plan_id: planId }; - this.ws.send(JSON.stringify(message)); - this.planSubscriptions.delete(planId); - } - } on(eventType: string, callback: (message: StreamMessage) => void): () => void { if (!this.listeners.has(eventType)) {