Skip to content

Commit 05e2479

Browse files
committed
chore(example):: add OpenAI Simple agent runner example
resolves #82
1 parent d9f43be commit 05e2479

File tree

3 files changed

+136
-0
lines changed

3 files changed

+136
-0
lines changed

examples/openai_simple/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Flux0 OpenAI Simple Example
2+
3+
This is a minimal Flux0 agent runner that uses the **OpenAI SDK** to generate streamed responses from a GPT model (e.g., `gpt-4o`) and emits Flux0-compatible streaming events for real-time updates.
4+
5+
## Features
6+
7+
- ✅ OpenAI SDK (v1.x+) integration using `AsyncOpenAI`.
8+
- ✅ Real-time streaming.
9+
10+
## Usage
11+
12+
Define the agent:
13+
14+
```bash
15+
flux0 agents create --name "OpenAI Simple" --type openai_simple
16+
```

examples/openai_simple/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from lagom import Container
2+
3+
from .agent import OpenAIChatAgentRunner
4+
5+
6+
async def init_module(container: Container) -> None:
7+
container[OpenAIChatAgentRunner] = OpenAIChatAgentRunner
8+
9+
10+
async def shutdown_module() -> None: ...

examples/openai_simple/agent.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# openai_chat.py
2+
3+
import asyncio
4+
import time
5+
import uuid
6+
from typing import cast
7+
8+
from flux0_core.agent_runners.api import AgentRunner, Deps, agent_runner
9+
from flux0_core.agent_runners.context import Context
10+
from flux0_core.sessions import EventId, MessageEventData, StatusEventData
11+
from flux0_stream.types import ChunkEvent, JsonPatchOperation
12+
from openai import AsyncOpenAI
13+
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam
14+
15+
16+
@agent_runner("openai_simple")
17+
class OpenAIChatAgentRunner(AgentRunner):
18+
def __init__(self) -> None:
19+
self.client: AsyncOpenAI = AsyncOpenAI() # Uses env OPENAI_API_KEY
20+
21+
async def run(self, context: Context, deps: Deps) -> bool:
22+
agent = await deps.read_agent(context.agent_id)
23+
if not agent:
24+
raise ValueError(f"Agent {context.agent_id} not found")
25+
26+
# Get user input
27+
events = await deps.list_session_events(context.session_id)
28+
last_event = events[-1]
29+
if last_event.type != "message":
30+
return False
31+
32+
user_event_data = cast(MessageEventData, last_event.data)
33+
user_input = next(
34+
(part["content"] for part in user_event_data["parts"] if part["type"] == "content"),
35+
None,
36+
)
37+
if not user_input:
38+
raise ValueError("No user input content found")
39+
40+
event_id = EventId(uuid.uuid4().hex)
41+
seq = 0
42+
43+
try:
44+
# Emit initial processing + typing statuses
45+
await deps.event_emitter.enqueue_status_event(
46+
correlation_id=deps.correlator.correlation_id,
47+
data=StatusEventData(
48+
type="status", status="processing", data={"detail": "Thinking..."}
49+
),
50+
)
51+
await asyncio.sleep(0.5)
52+
53+
await deps.event_emitter.enqueue_status_event(
54+
correlation_id=deps.correlator.correlation_id,
55+
data=StatusEventData(type="status", status="typing"),
56+
)
57+
58+
messages: list[ChatCompletionSystemMessageParam | ChatCompletionUserMessageParam] = [
59+
ChatCompletionSystemMessageParam(role="system", content="Be concise."),
60+
ChatCompletionUserMessageParam(role="user", content=str(user_input)),
61+
]
62+
stream = await self.client.chat.completions.create(
63+
model="gpt-4o",
64+
stream=True,
65+
messages=messages,
66+
)
67+
68+
async for chunk in stream:
69+
delta = chunk.choices[0].delta.content
70+
if delta:
71+
patch: JsonPatchOperation = {
72+
"op": "add",
73+
"path": "/-",
74+
"value": delta,
75+
}
76+
chunk_event = ChunkEvent(
77+
correlation_id=deps.correlator.correlation_id,
78+
seq=seq,
79+
event_id=event_id,
80+
patches=[patch],
81+
metadata={"agent_id": agent.id, "agent_name": agent.name},
82+
timestamp=time.time(),
83+
)
84+
await deps.event_emitter.enqueue_event_chunk(chunk_event)
85+
seq += 1
86+
87+
# Send 'ready' when done
88+
await deps.event_emitter.enqueue_status_event(
89+
correlation_id=deps.correlator.correlation_id,
90+
event_id=event_id,
91+
data=StatusEventData(type="status", status="ready"),
92+
)
93+
return True
94+
95+
except Exception as e:
96+
await deps.event_emitter.enqueue_status_event(
97+
correlation_id=deps.correlator.correlation_id,
98+
data=StatusEventData(type="status", status="error", data=str(e)),
99+
)
100+
return False
101+
102+
finally:
103+
await deps.event_emitter.enqueue_status_event(
104+
correlation_id=deps.correlator.correlation_id,
105+
data=StatusEventData(
106+
type="status",
107+
status="completed",
108+
acknowledged_offset=0,
109+
),
110+
)

0 commit comments

Comments
 (0)