Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/backend/v4/magentic_agents/proxy_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def get_new_thread(self, **kwargs: Any) -> AgentThread:
A new AgentThread instance
"""
return AgentThread(**kwargs)

async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
Expand Down Expand Up @@ -161,7 +160,7 @@ async def _invoke_stream_internal(
)
logger.debug("ProxyAgent: Message text: %s", message_text[:100])

clarification_req_text = f"I need clarification about: {message_text}"
clarification_req_text = f"{message_text}"
clarification_request = UserClarificationRequest(
question=clarification_req_text,
request_id=str(uuid.uuid4()),
Expand Down
169 changes: 117 additions & 52 deletions src/backend/v4/orchestration/orchestration_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ChatMessage,
WorkflowOutputEvent,
MagenticBuilder,
# MagenticCallbackMode,
# MagenticCallbackMode,
MagenticOrchestratorMessageEvent,
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
Expand Down Expand Up @@ -52,7 +52,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
- HumanApprovalMagenticManager as orchestrator manager
- AzureAIAgentClient as the underlying chat client
- Event-based callbacks for streaming and final responses

This mirrors the old Semantic Kernel orchestration setup:
- Uses same deployment, endpoint, and credentials
- Applies same execution settings (temperature, max_tokens)
Expand All @@ -71,21 +71,22 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
project_endpoint=config.AZURE_AI_PROJECT_ENDPOINT,
model_deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME,
async_credential=credential,

)

cls.logger.info(
"Created AzureAIAgentClient for orchestration with model '%s' at endpoint '%s'",
config.AZURE_OPENAI_DEPLOYMENT_NAME,
config.AZURE_AI_PROJECT_ENDPOINT
config.AZURE_AI_PROJECT_ENDPOINT,
)
except Exception as e:
cls.logger.error("Failed to create AzureAIAgentClient: %s", e)
raise

# Create HumanApprovalMagenticManager with the chat client
# Execution settings (temperature=0.1, max_tokens=4000) are configured via
# Execution settings (temperature=0.1, max_tokens=4000) are configured via
# orchestration_config.create_execution_settings() which matches old SK version
try:
try:
manager = HumanApprovalMagenticManager(
user_id=user_id,
chat_client=chat_client,
Expand All @@ -95,7 +96,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
cls.logger.info(
"Created HumanApprovalMagenticManager for user '%s' with max_rounds=%d",
user_id,
orchestration_config.max_rounds
orchestration_config.max_rounds,
)
except Exception as e:
cls.logger.error("Failed to create manager: %s", e)
Expand All @@ -107,11 +108,11 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
name = getattr(ag, "agent_name", None) or getattr(ag, "name", None)
if not name:
name = f"agent_{len(participants)+1}"

# Extract the inner ChatAgent for wrapper templates
# FoundryAgentTemplate and ReasoningAgentTemplate wrap a ChatAgent in self._agent
# ProxyAgent directly extends BaseAgent and can be used as-is
if hasattr(ag, '_agent') and ag._agent is not None:
if hasattr(ag, "_agent") and ag._agent is not None:
# This is a wrapper (FoundryAgentTemplate or ReasoningAgentTemplate)
# Use the inner ChatAgent which implements AgentProtocol
participants[name] = ag._agent
Expand All @@ -121,16 +122,23 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
participants[name] = ag
cls.logger.debug("Added participant '%s'", name)

# Assemble workflow with .on_event() callback (proper way for agent_framework)
# Assemble workflow with callback (proper way for agent_framework)
builder = (
MagenticBuilder()
.participants(**participants)
.with_standard_manager(manager=manager)
.with_standard_manager(
manager=manager,
max_round_count=orchestration_config.max_rounds,
max_stall_count=0,
)
)

# Build workflow
workflow = builder.build()
cls.logger.info("Built Magentic workflow with %d participants and event callbacks", len(participants))
cls.logger.info(
"Built Magentic workflow with %d participants and event callbacks",
len(participants),
)

return workflow

Expand All @@ -149,10 +157,14 @@ async def get_current_or_new_orchestration(
current = orchestration_config.get_current_orchestration(user_id)
if current is None or team_switched:
if current is not None and team_switched:
cls.logger.info("Team switched, closing previous agents for user '%s'", user_id)
cls.logger.info(
"Team switched, closing previous agents for user '%s'", user_id
)
# Close prior agents (same logic as old version)
for agent in getattr(current, "_participants", {}).values():
agent_name = getattr(agent, "agent_name", getattr(agent, "name", ""))
agent_name = getattr(
agent, "agent_name", getattr(agent, "name", "")
)
if agent_name != "ProxyAgent":
close_coro = getattr(agent, "close", None)
if callable(close_coro):
Expand All @@ -161,6 +173,7 @@ async def get_current_or_new_orchestration(
cls.logger.debug("Closed agent '%s'", agent_name)
except Exception as e:
cls.logger.error("Error closing agent: %s", e)


factory = MagenticAgentFactory()
try:
Expand All @@ -169,16 +182,20 @@ async def get_current_or_new_orchestration(
)
cls.logger.info("Created %d agents for user '%s'", len(agents), user_id)
except Exception as e:
cls.logger.error("Failed to create agents for user '%s': %s", user_id, e)
cls.logger.error(
"Failed to create agents for user '%s': %s", user_id, e
)
print(f"Failed to create agents for user '{user_id}': {e}")
raise
try:
cls.logger.info("Initializing new orchestration for user '%s'", user_id)
orchestration_config.orchestrations[user_id] = await cls.init_orchestration(
agents, user_id
orchestration_config.orchestrations[user_id] = (
await cls.init_orchestration(agents, user_id)
)
except Exception as e:
cls.logger.error("Failed to initialize orchestration for user '%s': %s", user_id, e)
cls.logger.error(
"Failed to initialize orchestration for user '%s': %s", user_id, e
)
print(f"Failed to initialize orchestration for user '{user_id}': {e}")
raise
return orchestration_config.get_current_orchestration(user_id)
Expand All @@ -189,28 +206,60 @@ async def get_current_or_new_orchestration(
async def run_orchestration(self, user_id: str, input_task) -> None:
"""
Execute the Magentic workflow for the provided user and task description.

This mirrors the old SK orchestration:
- Uses same execution settings (temperature=0.1, max_tokens=4000)
- Maintains same approval workflow
- Sends same WebSocket updates
"""
job_id = str(uuid.uuid4())
orchestration_config.set_approval_pending(job_id)
self.logger.info("Starting orchestration job '%s' for user '%s'", job_id, user_id)
self.logger.info(
"Starting orchestration job '%s' for user '%s'", job_id, user_id
)

workflow = orchestration_config.get_current_orchestration(user_id)
if workflow is None:
raise ValueError("Orchestration not initialized for user.")
# Fresh thread per participant to avoid cross-run state bleed
executors = getattr(workflow, "executors", {})
self.logger.debug("Executor keys at run start: %s", list(executors.keys()))

# Ensure manager tracks user_id (same as old version)
try:
manager = getattr(workflow, "_manager", None)
if manager and hasattr(manager, "current_user_id"):
manager.current_user_id = user_id
self.logger.debug("Set user_id on manager = %s", user_id)
except Exception as e:
self.logger.error("Error setting user_id on manager: %s", e)
for exec_key, executor in executors.items():
try:
if exec_key == "magentic_orchestrator":
# Orchestrator path
if hasattr(executor, "_conversation"):
conv = getattr(executor, "_conversation")
# Support list-like or custom container with clear()
if hasattr(conv, "clear") and callable(conv.clear):
conv.clear()
self.logger.debug("Cleared orchestrator conversation (%s)", exec_key)
elif isinstance(conv, list):
conv[:] = []
self.logger.debug("Emptied orchestrator conversation list (%s)", exec_key)
else:
self.logger.debug("Orchestrator conversation not clearable type (%s): %s", exec_key, type(conv))
else:
self.logger.debug("Orchestrator has no _conversation attribute (%s)", exec_key)
else:
# Agent path
if hasattr(executor, "_chat_history"):
hist = getattr(executor, "_chat_history")
if hasattr(hist, "clear") and callable(hist.clear):
hist.clear()
self.logger.debug("Cleared agent chat history (%s)", exec_key)
elif isinstance(hist, list):
hist[:] = []
self.logger.debug("Emptied agent chat history list (%s)", exec_key)
else:
self.logger.debug("Agent chat history not clearable type (%s): %s", exec_key, type(hist))
else:
self.logger.debug("Agent executor has no _chat_history attribute (%s)", exec_key)
except Exception as e:
self.logger.warning("Failed clearing state for executor %s: %s", exec_key, e)
# --- END NEW BLOCK ---


# Build task from input (same as old version)
task_text = getattr(input_task, "description", str(input_task))
Expand All @@ -220,62 +269,78 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
# Execute workflow using run_stream with task as positional parameter
# The execution settings are configured in the manager/client
final_output: str | None = None

self.logger.info("Starting workflow execution...")
async for event in workflow.run_stream(task_text):
try:
# Handle orchestrator messages (task assignments, coordination)
if isinstance(event, MagenticOrchestratorMessageEvent):
message_text = getattr(event.message, 'text', '')
message_text = getattr(event.message, "text", "")
self.logger.info(f"[ORCHESTRATOR:{event.kind}] {message_text}")

# Handle streaming updates from agents
elif isinstance(event, MagenticAgentDeltaEvent):
try:
await streaming_agent_response_callback(
event.agent_id,
event.agent_id,
event, # Pass the event itself as the update object
False, # Not final yet (streaming in progress)
user_id
user_id,
)
except Exception as e:
self.logger.error(f"Error in streaming callback for agent {event.agent_id}: {e}")

self.logger.error(
f"Error in streaming callback for agent {event.agent_id}: {e}"
)

# Handle final agent messages (complete response)
elif isinstance(event, MagenticAgentMessageEvent):
if event.message:
try:
agent_response_callback(event.agent_id, event.message, user_id)
agent_response_callback(
event.agent_id, event.message, user_id
)
except Exception as e:
self.logger.error(f"Error in agent callback for agent {event.agent_id}: {e}")

self.logger.error(
f"Error in agent callback for agent {event.agent_id}: {e}"
)

# Handle final result from the entire workflow
elif isinstance(event, MagenticFinalResultEvent):
final_text = getattr(event.message, 'text', '')
self.logger.info(f"[FINAL RESULT] Length: {len(final_text)} chars")

final_text = getattr(event.message, "text", "")
self.logger.info(
f"[FINAL RESULT] Length: {len(final_text)} chars"
)

# Handle workflow output event (captures final result)
elif isinstance(event, WorkflowOutputEvent):
output_data = event.data
if isinstance(output_data, ChatMessage):
final_output = getattr(output_data, "text", None) or str(output_data)
final_output = getattr(output_data, "text", None) or str(
output_data
)
else:
final_output = str(output_data)
self.logger.debug("Received workflow output event")

except Exception as e:
self.logger.error(f"Error processing event {type(event).__name__}: {e}", exc_info=True)
self.logger.error(
f"Error processing event {type(event).__name__}: {e}",
exc_info=True,
)

# Extract final result
final_text = final_output if final_output else ""
# Log results (same format as old version)

# Log results
self.logger.info("\nAgent responses:")
self.logger.info("Orchestration completed. Final result length: %d chars", len(final_text))
self.logger.info(
"Orchestration completed. Final result length: %d chars",
len(final_text),
)
self.logger.info("\nFinal result:\n%s", final_text)
self.logger.info("=" * 50)

# Send final result via WebSocket (same as old version)
# Send final result via WebSocket
await connection_config.send_status_update_async(
{
"type": WebsocketMessageType.FINAL_RESULT_MESSAGE,
Expand All @@ -289,15 +354,15 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE,
)
self.logger.info("Final result sent via WebSocket to user '%s'", user_id)

except Exception as e:
# Error handling (enhanced from old version)
# Error handling
self.logger.error("Unexpected orchestration error: %s", e, exc_info=True)
self.logger.error("Error type: %s", type(e).__name__)
if hasattr(e, "__dict__"):
self.logger.error("Error attributes: %s", e.__dict__)
self.logger.info("=" * 50)

# Send error status to user
try:
await connection_config.send_status_update_async(
Expand All @@ -314,4 +379,4 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
)
except Exception as send_error:
self.logger.error("Failed to send error status: %s", send_error)
raise
raise
2 changes: 2 additions & 0 deletions src/frontend/src/pages/PlanPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ const PlanPage: React.FC = () => {
if (pendingNavigation) {
pendingNavigation();
}
webSocketService.disconnect();
} catch (error) {
console.error('❌ Failed to cancel plan:', error);
showToast('Failed to cancel the plan properly, but navigation will continue.', 'error');
Expand Down Expand Up @@ -354,6 +355,7 @@ const PlanPage: React.FC = () => {

// Wait for the agent message to be processed and persisted
// The processAgentMessage function will handle refreshing the task list
webSocketService.disconnect();
processAgentMessage(agentMessageData, planData, is_final, streamingMessageBuffer);

}
Expand Down
Loading
Loading