Skip to content

Commit 363de95

Browse files
author
Erez Sharim
committed
chore: langchain & pydantic ai weather example
adds a langchain example for weather agent adds a PydanticAI example for weather agent changes langchain default event function to send processing events for tool calls instead of message events
1 parent 05e2479 commit 363de95

File tree

12 files changed

+785
-197
lines changed

12 files changed

+785
-197
lines changed

examples/langchain_simple/agent.py

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,13 @@
1-
from typing import cast
2-
31
from flux0_core.agent_runners.api import AgentRunner, Deps, agent_runner
42
from flux0_core.agent_runners.context import Context
53
from flux0_core.sessions import (
6-
MessageEventData,
74
StatusEventData,
85
)
96
from flux0_stream.frameworks.langchain import RunContext, filter_and_map_events, handle_event
107
from langchain.chat_models import init_chat_model
118
from langchain_core.messages import HumanMessage, SystemMessage
129

13-
14-
async def read_user_input(deps: Deps, context: Context) -> str:
15-
"""Extract user input from the last message event."""
16-
# read session events and expect the last event to be the user input
17-
events = await deps.list_session_events(context.session_id)
18-
last_event = events[-1]
19-
if last_event.type != "message":
20-
raise ValueError(f"Expected last event to be a message, got {last_event.type}")
21-
user_event_data = cast(MessageEventData, last_event.data)
22-
for part in user_event_data["parts"]:
23-
if part["type"] == "content":
24-
user_input = part["content"]
25-
break
26-
if not user_input:
27-
raise ValueError("No TextPart found in user event data")
28-
29-
return str(user_input)
10+
from examples.utils.utils import read_user_input
3011

3112

3213
@agent_runner("langchain_simple")

examples/langgraph/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from lagom import Container
2+
3+
from .weather_agent import WeatherAgentRunner
4+
5+
6+
async def init_module(container: Container) -> None:
7+
container[WeatherAgentRunner] = WeatherAgentRunner
8+
9+
10+
async def shutdown_module() -> None:
11+
print("Shutdown!")
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from flux0_core.agent_runners.api import AgentRunner, Deps, agent_runner
2+
from flux0_core.agent_runners.context import Context
3+
from flux0_core.sessions import (
4+
StatusEventData,
5+
)
6+
from flux0_stream.frameworks.langchain import RunContext, filter_and_map_events, handle_event
7+
from langchain_core.runnables.config import RunnableConfig
8+
from langchain_core.tools import tool
9+
10+
from examples.utils.utils import read_user_input
11+
from langgraph.checkpoint.memory import InMemorySaver
12+
from langgraph.prebuilt import create_react_agent
13+
14+
15+
@tool(name_or_callable="get_weather", description="Get the weather for a given city")
16+
def get_weather(city: str) -> str:
17+
"""Get weather for a given city."""
18+
return f"It's always sunny in {city}!"
19+
20+
21+
checkpointer = InMemorySaver()
22+
weather_agent = create_react_agent(
23+
model="gpt-4.1-nano",
24+
tools=[get_weather],
25+
prompt="You are a friendly, helpful assistant",
26+
checkpointer=checkpointer,
27+
)
28+
29+
30+
@agent_runner("langchain_weather_agent")
31+
class WeatherAgentRunner(AgentRunner):
32+
async def run(self, context: Context, deps: Deps) -> bool:
33+
# read the agent object
34+
agent = await deps.read_agent(context.agent_id)
35+
if not agent:
36+
raise ValueError(f"Agent with id {context.agent_id} not found")
37+
38+
# read session events and expect the last event to be the user input
39+
user_input = await read_user_input(deps, context)
40+
if not user_input:
41+
raise ValueError("No user input found in session events")
42+
43+
deps.logger.info(
44+
f"User Input: {user_input} for session {context.session_id} and agent {agent.id}"
45+
)
46+
47+
input = {"messages": [("user", user_input)]}
48+
config = RunnableConfig(
49+
configurable={
50+
"thread_id": context.session_id,
51+
"agent_id": agent.id,
52+
}
53+
)
54+
try:
55+
model_events = weather_agent.astream_events(
56+
input=input,
57+
config=config,
58+
version="v2",
59+
)
60+
61+
run_ctx: RunContext = RunContext(last_known_event_offset=0)
62+
# iterate over the model events and stream them to the client
63+
async for e in filter_and_map_events(model_events, deps.logger):
64+
await handle_event(
65+
agent,
66+
deps.correlator.correlation_id,
67+
e,
68+
deps.event_emitter,
69+
deps.logger,
70+
run_ctx,
71+
)
72+
73+
return True
74+
except Exception as e:
75+
await deps.event_emitter.enqueue_status_event(
76+
correlation_id=deps.correlator.correlation_id,
77+
data=StatusEventData(type="status", status="error", data=str(e)),
78+
)
79+
80+
return False
81+
finally:
82+
await deps.event_emitter.enqueue_status_event(
83+
correlation_id=deps.correlator.correlation_id,
84+
data=StatusEventData(type="status", status="completed"),
85+
)

examples/pydanticai/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from lagom import Container
2+
3+
from .weather_agent import WeatherAgentRunner
4+
5+
6+
async def init_module(container: Container) -> None:
7+
container[WeatherAgentRunner] = WeatherAgentRunner
8+
9+
10+
async def shutdown_module() -> None:
11+
print("Shutdown!")
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import time
2+
import uuid
3+
from dataclasses import dataclass
4+
from datetime import date
5+
6+
from flux0_core.agent_runners.api import Agent as FAgent
7+
from flux0_core.agent_runners.api import AgentRunner, Deps, agent_runner
8+
from flux0_core.agent_runners.context import Context
9+
from flux0_core.sessions import (
10+
EventId,
11+
StatusEventData,
12+
)
13+
from flux0_stream.emitter.utils.events import send_processing_event
14+
from flux0_stream.types import ChunkEvent
15+
from pydantic_ai import Agent
16+
from pydantic_ai.messages import (
17+
FinalResultEvent,
18+
FunctionToolCallEvent,
19+
FunctionToolResultEvent,
20+
PartDeltaEvent,
21+
PartStartEvent,
22+
TextPartDelta,
23+
ToolCallPartDelta,
24+
)
25+
from pydantic_ai.tools import RunContext
26+
27+
from examples.utils.utils import read_user_input
28+
29+
30+
@dataclass
31+
class WeatherService:
32+
async def get_forecast(self, location: str, forecast_date: date) -> str:
33+
# In real code: call weather API, DB queries, etc.
34+
return f"The forecast in {location} on {forecast_date} is 24°C and sunny."
35+
36+
async def get_historic_weather(self, location: str, forecast_date: date) -> str:
37+
# In real code: call a historical weather API or DB
38+
return f"The weather in {location} on {forecast_date} was 18°C and partly cloudy."
39+
40+
41+
weather_agent = Agent[WeatherService, str](
42+
"openai:gpt-4.1-nano",
43+
deps_type=WeatherService,
44+
output_type=str,
45+
system_prompt="Providing a weather forecast at the locations the user provides.",
46+
)
47+
48+
49+
@weather_agent.tool
50+
async def weather_forecast(
51+
ctx: RunContext[WeatherService],
52+
location: str,
53+
forecast_date: date,
54+
) -> str:
55+
if forecast_date >= date.today():
56+
return await ctx.deps.get_forecast(location, forecast_date)
57+
else:
58+
return await ctx.deps.get_historic_weather(location, forecast_date)
59+
60+
61+
async def send_message(
62+
deps: Deps,
63+
input: str,
64+
agent: FAgent,
65+
event_id: EventId,
66+
):
67+
await send_processing_event(deps, "thinking...")
68+
# Begin a node-by-node, streaming iteration
69+
async with weather_agent.iter(input, deps=WeatherService()) as run:
70+
async for node in run:
71+
if Agent.is_model_request_node(node):
72+
# A model request node => We can stream tokens from the model's request
73+
async with node.stream(run.ctx) as request_stream:
74+
async for event in request_stream:
75+
if isinstance(event, PartStartEvent):
76+
pass
77+
elif isinstance(event, PartDeltaEvent):
78+
if isinstance(event.delta, TextPartDelta):
79+
pass
80+
elif isinstance(event.delta, ToolCallPartDelta):
81+
pass
82+
elif isinstance(event, FinalResultEvent):
83+
pass
84+
elif Agent.is_call_tools_node(node):
85+
# A handle-response node => The model returned some data, potentially calls a tool
86+
async with node.stream(run.ctx) as handle_stream:
87+
async for event in handle_stream:
88+
if isinstance(event, FunctionToolCallEvent):
89+
value = f"[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args}"
90+
await send_processing_event(deps, value)
91+
elif isinstance(event, FunctionToolResultEvent):
92+
value = f"[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}"
93+
await send_processing_event(deps, value)
94+
elif Agent.is_end_node(node):
95+
assert run.result.output == node.data.output # type: ignore
96+
# Once an End node is reached, the agent run is complete
97+
await deps.event_emitter.enqueue_status_event(
98+
correlation_id=deps.correlator.correlation_id,
99+
data=StatusEventData(type="status", status="typing"),
100+
)
101+
cec = ChunkEvent(
102+
correlation_id=deps.correlator.correlation_id,
103+
seq=0,
104+
event_id=event_id,
105+
patches=[
106+
{
107+
"op": "add",
108+
"path": "/-",
109+
"value": run.result.output, # type: ignore
110+
}
111+
],
112+
metadata={
113+
"agent_id": agent.id,
114+
"agent_name": agent.name,
115+
},
116+
timestamp=time.time(),
117+
)
118+
await deps.event_emitter.enqueue_event_chunk(cec)
119+
await deps.event_emitter.enqueue_status_event(
120+
correlation_id=deps.correlator.correlation_id,
121+
data=StatusEventData(type="status", status="ready"),
122+
event_id=event_id,
123+
)
124+
125+
126+
@agent_runner("pydantic_weather_agent")
127+
class WeatherAgentRunner(AgentRunner):
128+
async def run(self, context: Context, deps: Deps) -> bool:
129+
try:
130+
# read the agent object
131+
agent = await deps.read_agent(context.agent_id)
132+
if not agent:
133+
raise ValueError(f"Agent with id {context.agent_id} not found")
134+
135+
# read session events and expect the last event to be the user input
136+
user_input = await read_user_input(deps, context)
137+
if not user_input:
138+
raise ValueError("No user input found in session events")
139+
140+
deps.logger.info(
141+
f"User Input: {user_input} for session {context.session_id} and agent {agent.id}"
142+
)
143+
144+
await send_message(
145+
deps=deps,
146+
input=user_input,
147+
agent=agent,
148+
event_id=EventId(uuid.uuid4().hex),
149+
)
150+
151+
return True
152+
except Exception as e:
153+
await deps.event_emitter.enqueue_status_event(
154+
correlation_id=deps.correlator.correlation_id,
155+
data=StatusEventData(type="status", status="error", data=str(e)),
156+
)
157+
158+
return False
159+
finally:
160+
await deps.event_emitter.enqueue_status_event(
161+
correlation_id=deps.correlator.correlation_id,
162+
data=StatusEventData(type="status", status="completed"),
163+
)

examples/static/static_agent.py

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
import asyncio
22
import time
33
import uuid
4-
from typing import cast
54

65
from flux0_core.agent_runners.api import AgentRunner, Deps, agent_runner
76
from flux0_core.agent_runners.context import Context
87
from flux0_core.sessions import (
98
EventId,
10-
MessageEventData,
119
StatusEventData,
1210
)
11+
from flux0_stream.emitter.utils.events import send_processing_event
1312
from flux0_stream.types import ChunkEvent
1413

14+
from examples.utils.utils import read_user_input
15+
1516

1617
@agent_runner("static_agent")
1718
class StaticAgentRunner(AgentRunner):
@@ -21,33 +22,17 @@ async def run(self, context: Context, deps: Deps) -> bool:
2122
if not agent:
2223
raise ValueError(f"Agent with id {context.agent_id} not found")
2324

24-
# read session events and expect the last event to be the user input
25-
events = await deps.list_session_events(context.session_id)
26-
last_event = events[-1]
27-
if last_event.type != "message":
28-
return False
29-
user_event_data = cast(MessageEventData, last_event.data)
30-
for part in user_event_data["parts"]:
31-
if part["type"] == "content":
32-
user_input = part["content"]
33-
break
25+
# read session events and extract user input
26+
user_input = await read_user_input(deps, context)
3427
if not user_input:
35-
raise ValueError("No TextPart found in user event data")
28+
raise ValueError("No user input found in session events")
3629

3730
deps.logger.info(
3831
f"User Input: {user_input} for session {context.session_id} and agent {agent.id}"
3932
)
4033

4134
# send processing event, indicating that the agent is thinking
42-
await deps.event_emitter.enqueue_status_event(
43-
correlation_id=deps.correlator.correlation_id,
44-
data=StatusEventData(
45-
type="status",
46-
status="processing",
47-
# optionally include a detailed processing message
48-
data={"detail": "Thinking...!"},
49-
),
50-
)
35+
await send_processing_event(deps, "Thinking...!")
5136
await asyncio.sleep(1.5)
5237

5338
await deps.event_emitter.enqueue_status_event(

examples/utils/utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from typing import cast
2+
3+
from flux0_core.agent_runners.api import Deps
4+
from flux0_core.agent_runners.context import Context
5+
from flux0_core.sessions import MessageEventData
6+
7+
8+
async def read_user_input(deps: Deps, context: Context) -> str:
9+
"""Extract user input from the last message event."""
10+
11+
# read session events and expect the last event to be the user input
12+
user_input = None
13+
14+
events = await deps.list_session_events(context.session_id)
15+
last_event = events[-1]
16+
if last_event.type != "message":
17+
raise ValueError(f"Expected last event to be a message, got {last_event.type}")
18+
user_event_data = cast(MessageEventData, last_event.data)
19+
for part in user_event_data["parts"]:
20+
if part["type"] == "content":
21+
user_input = part["content"]
22+
break
23+
if not user_input:
24+
raise ValueError("No TextPart found in user event data")
25+
26+
return str(user_input)

0 commit comments

Comments
 (0)