3
3
import asyncio
4
4
import contextvars
5
5
import json
6
+ import os
6
7
import queue
7
8
import threading
8
9
from collections .abc import Generator
19
20
from agents import Agent
20
21
from agents import AgentHooks
21
22
from agents import function_tool
22
- from agents import handoff
23
23
from agents import ModelSettings
24
24
from agents import RunContextWrapper
25
25
from agents import Runner
30
30
from agents .stream_events import RawResponsesStreamEvent
31
31
from agents .stream_events import RunItemStreamEvent
32
32
from braintrust import traced
33
+ from openai .types import Reasoning
33
34
from pydantic import BaseModel
34
35
35
36
from onyx .agents .agent_search .dr .constants import MAX_CHAT_HISTORY_MESSAGES
@@ -129,12 +130,12 @@ def llm_completion(
129
130
messages : List [Dict [str , Any ]],
130
131
stream : bool = False ,
131
132
) -> litellm .ModelResponse :
132
- return litellm .completion (
133
+ return litellm .responses (
133
134
model = model_name ,
134
- temperature = temperature ,
135
- messages = messages ,
135
+ input = messages ,
136
136
tools = [],
137
137
stream = stream ,
138
+ reasoning = litellm .Reasoning (effort = "medium" , summary = "detailed" ),
138
139
)
139
140
140
141
@@ -359,6 +360,7 @@ def construct_deep_research_agent(llm: LLM) -> Agent:
359
360
- Compare sources and dates; prioritize recency for time-sensitive topics.
360
361
- Minimize redundancy by skimming before fetching.
361
362
- Think out loud in a compact way, but keep reasoning crisp.
363
+ - If context exceeds 10000 tokens, handoff to the compactor agent.
362
364
"""
363
365
return Agent (
364
366
name = "Researcher" ,
@@ -377,7 +379,6 @@ def construct_deep_research_agent(llm: LLM) -> Agent:
377
379
378
380
379
381
def unified_event_stream (
380
- agent : Agent ,
381
382
messages : List [Dict [str , Any ]],
382
383
cfg : GraphConfig ,
383
384
llm : LLM ,
@@ -386,28 +387,17 @@ def unified_event_stream(
386
387
) -> Generator [Dict [str , Any ], None , None ]:
387
388
bus : Queue = Queue ()
388
389
emitter = Emitter (bus )
389
- # start_run_in_thread(
390
- # agent=agent,
391
- # messages=messages,
392
- # cfg=cfg,
393
- # llm=llm,
394
- # search_tool=search_tool,
395
- # emitter=emitter,
396
- # )
397
-
398
- # Capture current context for propagation to worker thread
399
390
current_context = contextvars .copy_context ()
400
-
401
391
t = threading .Thread (
402
392
target = current_context .run ,
403
393
args = (
404
- thread_worker_dr_turn ,
394
+ # thread_worker_dr_turn,
395
+ thread_worker_simple_turn ,
405
396
messages ,
406
397
cfg ,
407
398
llm ,
408
399
emitter ,
409
400
search_tool ,
410
- None ,
411
401
), # eval_context=None for now
412
402
daemon = True ,
413
403
)
@@ -437,9 +427,7 @@ def stream_chat_sync(
437
427
) -> Generator [Dict [str , Any ], None , None ]:
438
428
bus : Queue = Queue ()
439
429
emitter = Emitter (bus )
440
- agent = construct_deep_research_agent (llm )
441
430
return unified_event_stream (
442
- agent = agent ,
443
431
messages = messages ,
444
432
cfg = cfg ,
445
433
llm = llm ,
@@ -452,25 +440,29 @@ def construct_simple_agent(
452
440
llm : LLM ,
453
441
) -> Agent :
454
442
litellm_model = LitellmModel (
455
- model = llm . config . model_name ,
443
+ model = "o3-mini" ,
456
444
api_key = llm .config .api_key ,
457
445
)
458
446
return Agent (
459
447
name = "Assistant" ,
460
448
instructions = """
461
449
You are a helpful assistant that can search the web, fetch content from URLs,
462
- and search internal databases.
450
+ and search internal databases. Please do some reasoning and then return your answer.
463
451
""" ,
464
452
model = litellm_model ,
465
453
tools = [web_search , web_fetch , internal_search ],
466
454
model_settings = ModelSettings (
467
- temperature = llm . config . temperature ,
455
+ temperature = 0.0 ,
468
456
include_usage = True , # Track usage metrics
457
+ reasoning = Reasoning (
458
+ effort = "medium" , summary = "detailed" , generate_summary = "detailed"
459
+ ),
460
+ verbose = True ,
469
461
),
470
462
)
471
463
472
464
473
- def thread_worker_dr_turn (messages , cfg , llm , emitter , search_tool , eval_context = None ):
465
+ def thread_worker_dr_turn (messages , cfg , llm , emitter , search_tool ):
474
466
"""
475
467
Worker function for deep research turn that runs in a separate thread.
476
468
@@ -483,12 +475,26 @@ def thread_worker_dr_turn(messages, cfg, llm, emitter, search_tool, eval_context
483
475
eval_context: Evaluation context to be propagated to the worker thread
484
476
"""
485
477
try :
486
- dr_turn (messages , cfg , llm , emitter , search_tool , eval_context )
478
+ dr_turn (messages , cfg , llm , emitter , search_tool )
487
479
except Exception as e :
488
480
logger .error (f"Error in dr_turn: { e } " , exc_info = e , stack_info = True )
489
481
emitter .emit (kind = "done" , data = {"ok" : False })
490
482
491
483
484
+ def thread_worker_simple_turn (messages , cfg , llm , emitter , search_tool ):
485
+ try :
486
+ simple_turn (
487
+ messages = messages ,
488
+ cfg = cfg ,
489
+ llm = llm ,
490
+ turn_event_stream_emitter = emitter ,
491
+ search_tool = search_tool ,
492
+ )
493
+ except Exception as e :
494
+ logger .error (f"Error in simple_turn: { e } " , exc_info = e , stack_info = True )
495
+ emitter .emit (kind = "done" , data = {"ok" : False })
496
+
497
+
492
498
SENTINEL = object ()
493
499
494
500
@@ -557,13 +563,48 @@ def _do_cancel():
557
563
self ._loop .call_soon_threadsafe (_do_cancel )
558
564
559
565
566
+ def simple_turn (
567
+ messages : List [Dict [str , Any ]],
568
+ cfg : GraphConfig ,
569
+ llm : LLM ,
570
+ turn_event_stream_emitter : Emitter ,
571
+ search_tool : SearchTool | None = None ,
572
+ ) -> None :
573
+ llm_response = llm_completion (
574
+ model_name = "gpt-5-mini" ,
575
+ temperature = 0.0 ,
576
+ messages = messages ,
577
+ stream = True ,
578
+ )
579
+ llm_response .json ()
580
+ simple_agent = construct_simple_agent (llm )
581
+ ctx = MyContext (
582
+ run_dependencies = RunDependencies (
583
+ search_tool = search_tool , emitter = turn_event_stream_emitter , llm = llm
584
+ )
585
+ )
586
+ bridge = StreamBridge (simple_agent , messages , ctx , max_turns = 100 ).start ()
587
+ for ev in bridge .events ():
588
+ if isinstance (ev , RunItemStreamEvent ):
589
+ print ("RUN ITEM STREAM EVENT!" )
590
+ if ev .name == "reasoning_item_created" :
591
+ print ("REASONING!" )
592
+ turn_event_stream_emitter .emit (
593
+ kind = "reasoning" , data = ev .item .raw_item .model_dump ()
594
+ )
595
+ elif isinstance (ev , RawResponsesStreamEvent ):
596
+ print ("RAW RESPONSES STREAM EVENT!" )
597
+ print (ev .type )
598
+ turn_event_stream_emitter .emit (kind = "agent" , data = ev .data .model_dump ())
599
+ turn_event_stream_emitter .emit (kind = "done" , data = {"ok" : True })
600
+
601
+
560
602
def dr_turn (
561
603
messages : List [Dict [str , Any ]],
562
604
cfg : GraphConfig ,
563
605
llm : LLM ,
564
606
turn_event_stream_emitter : Emitter , # TurnEventStream is the primary output of the turn
565
607
search_tool : SearchTool | None = None ,
566
- eval_context = None ,
567
608
) -> None :
568
609
"""
569
610
Execute a deep research turn with evaluation context support.
@@ -588,34 +629,14 @@ def dr_turn(
588
629
turn_event_stream_emitter .emit (kind = "done" , data = {"ok" : True })
589
630
return
590
631
dr_agent = construct_deep_research_agent (llm )
591
- compactor_agent = Agent (
592
- name = "Compactor" ,
593
- instructions = f"""
594
- { RECOMMENDED_PROMPT_PREFIX }
595
- Summarize the full conversation so far into JSON with keys:\n
596
- - summary: concise timeline of what happened so far\n
597
- - facts: bullet list of stable facts (IDs, URLs, constraints)\n
598
- - open_questions: bullet list of TODOs / follow-ups\n
599
- Set already_compacted=true to prevent immediate re-compaction.
600
- Then hand off to deep research agent.
601
- """ ,
602
- output_type = dict ,
603
- handoffs = [
604
- handoff (
605
- agent = dr_agent ,
606
- input_filter = compaction_input_filter ,
607
- )
608
- ],
609
- tool_use_behavior = "stop_on_first_tool" ,
610
- )
611
632
ctx = MyContext (
612
633
run_dependencies = RunDependencies (
613
634
search_tool = search_tool ,
614
635
emitter = turn_event_stream_emitter ,
615
636
llm = llm ,
616
637
)
617
638
)
618
- bridge = StreamBridge (compactor_agent , messages , ctx , max_turns = 100 ).start ()
639
+ bridge = StreamBridge (dr_agent , messages , ctx , max_turns = 100 ).start ()
619
640
for ev in bridge .events ():
620
641
if isinstance (ev , RunItemStreamEvent ):
621
642
pass
@@ -663,3 +684,37 @@ def get_clarification(
663
684
stream = False ,
664
685
)
665
686
return llm_response
687
+
688
+
689
+ if __name__ == "__main__" :
690
+ messages = [
691
+ {
692
+ "role" : "user" ,
693
+ "content" : """
694
+ Let $N$ denote the number of ordered triples of positive integers $(a, b, c)$ such that $a, b, c
695
+ \\ leq 3^6$ and $a^3 + b^3 + c^3$ is a multiple of $3^7$. Find the remainder when $N$ is divided by $1000$.
696
+ """ ,
697
+ }
698
+ ]
699
+ # OpenAI reasoning is not supported yet due to: https://github.com/BerriAI/litellm/pull/14117
700
+ reasoning_agent = Agent (
701
+ name = "Reasoning" ,
702
+ instructions = "You are a reasoning agent. You are given a question and you need to reason about it." ,
703
+ model = LitellmModel (
704
+ model = "gpt-5-mini" ,
705
+ api_key = os .getenv ("OPENAI_API_KEY" ),
706
+ ),
707
+ tools = [],
708
+ model_settings = ModelSettings (
709
+ temperature = 0.0 ,
710
+ reasoning = Reasoning (effort = "medium" , summary = "detailed" ),
711
+ ),
712
+ )
713
+ llm_response = llm_completion (
714
+ model_name = "gpt-5-mini" ,
715
+ temperature = 0.0 ,
716
+ messages = messages ,
717
+ stream = False ,
718
+ )
719
+ x = llm_response .json ()
720
+ print (x )
0 commit comments