Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/onyx/agents/agent_search/dr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
DR_TIME_BUDGET_BY_TYPE = {
ResearchType.THOUGHTFUL: 3.0,
ResearchType.DEEP: 12.0,
ResearchType.FAST: 0.5,
}
1 change: 1 addition & 0 deletions backend/onyx/agents/agent_search/dr/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ResearchType(str, Enum):
LEGACY_AGENTIC = "LEGACY_AGENTIC" # only used for legacy agentic search migrations
THOUGHTFUL = "THOUGHTFUL"
DEEP = "DEEP"
FAST = "FAST"


class ResearchAnswerPurpose(str, Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from onyx.agents.agent_search.shared_graph_utils.utils import run_with_timeout
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.agents.agent_search.utils import create_question_prompt
from onyx.configs.agent_configs import TF_DR_TIMEOUT_LONG
from onyx.configs.agent_configs import TF_DR_TIMEOUT_SHORT
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import DocumentSourceDescription
from onyx.configs.constants import TMP_DRALPHA_PERSONA_NAME
Expand Down Expand Up @@ -488,7 +490,7 @@ def clarifier(
)

answer_tokens, _, _ = run_with_timeout(
80,
TF_DR_TIMEOUT_LONG,
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=create_question_prompt(
Expand All @@ -501,7 +503,7 @@ def clarifier(
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=60,
timeout_override=TF_DR_TIMEOUT_LONG,
ind=current_step_nr,
context_docs=None,
replace_citations=True,
Expand Down Expand Up @@ -612,7 +614,7 @@ def clarifier(

clarification = None

if research_type != ResearchType.THOUGHTFUL:
if research_type == ResearchType.DEEP:
result = _get_existing_clarification_request(graph_config)
if result is not None:
clarification, original_question, chat_history_string = result
Expand Down Expand Up @@ -645,7 +647,7 @@ def clarifier(
assistant_system_prompt, clarification_prompt
),
schema=ClarificationGenerationResponse,
timeout_override=25,
timeout_override=TF_DR_TIMEOUT_SHORT,
# max_tokens=1500,
)
except Exception as e:
Expand Down Expand Up @@ -674,7 +676,7 @@ def clarifier(
)

_, _, _ = run_with_timeout(
80,
TF_DR_TIMEOUT_LONG,
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=repeat_prompt,
Expand All @@ -683,7 +685,7 @@ def clarifier(
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=60,
timeout_override=TF_DR_TIMEOUT_LONG,
answer_piece=StreamingType.MESSAGE_DELTA.value,
ind=current_step_nr,
# max_tokens=None,
Expand Down
133 changes: 86 additions & 47 deletions backend/onyx/agents/agent_search/dr/nodes/dr_a1_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from onyx.agents.agent_search.shared_graph_utils.utils import run_with_timeout
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.agents.agent_search.utils import create_question_prompt
from onyx.configs.agent_configs import TF_DR_TIMEOUT_LONG
from onyx.configs.agent_configs import TF_DR_TIMEOUT_SHORT
from onyx.kg.utils.extraction_utils import get_entity_types_str
from onyx.kg.utils.extraction_utils import get_relationship_types_str
from onyx.prompts.dr_prompts import DEFAULLT_DECISION_PROMPT
Expand Down Expand Up @@ -170,11 +172,39 @@ def orchestrator(
reasoning_result = "(No reasoning result provided yet.)"
tool_calls_string = "(No tool calls provided yet.)"

if research_type == ResearchType.THOUGHTFUL:
if research_type not in ResearchType:
raise ValueError(f"Invalid research type: {research_type}")

if research_type in [ResearchType.THOUGHTFUL, ResearchType.FAST]:
if iteration_nr == 1:
remaining_time_budget = DR_TIME_BUDGET_BY_TYPE[ResearchType.THOUGHTFUL]
remaining_time_budget = DR_TIME_BUDGET_BY_TYPE[research_type]

elif remaining_time_budget <= 0:
return OrchestrationUpdate(
tools_used=[DRPath.CLOSER.value],
current_step_nr=current_step_nr,
query_list=[],
iteration_nr=iteration_nr,
log_messages=[
get_langgraph_node_log_string(
graph_component="main",
node_name="orchestrator",
node_start_time=node_start_time,
)
],
plan_of_record=plan_of_record,
remaining_time_budget=remaining_time_budget,
iteration_instructions=[
IterationInstructions(
iteration_nr=iteration_nr,
plan=None,
reasoning="Time to wrap up.",
purpose="",
)
],
)

elif iteration_nr > 1:
elif iteration_nr > 1 and remaining_time_budget > 0:
# for each iteration past the first one, we need to see whether we
# have enough information to answer the question.
# if we do, we can stop the iteration and return the answer.
Expand All @@ -200,7 +230,7 @@ def orchestrator(
reasoning_tokens: list[str] = [""]

reasoning_tokens, _, _ = run_with_timeout(
80,
TF_DR_TIMEOUT_LONG,
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=create_question_prompt(
Expand All @@ -211,7 +241,7 @@ def orchestrator(
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=60,
timeout_override=TF_DR_TIMEOUT_LONG,
answer_piece=StreamingType.REASONING_DELTA.value,
ind=current_step_nr,
# max_tokens=None,
Expand Down Expand Up @@ -297,7 +327,7 @@ def orchestrator(
decision_prompt,
),
schema=OrchestratorDecisonsNoPlan,
timeout_override=35,
timeout_override=TF_DR_TIMEOUT_SHORT,
# max_tokens=2500,
)
next_step = orchestrator_action.next_step
Expand All @@ -320,7 +350,7 @@ def orchestrator(
reasoning_result = "Time to wrap up."
next_tool_name = DRPath.CLOSER.value

else:
elif research_type == ResearchType.DEEP:
if iteration_nr == 1 and not plan_of_record:
# by default, we start a new iteration, but if there is a feedback request,
# we start a new iteration 0 again (set a bit later)
Expand Down Expand Up @@ -348,7 +378,7 @@ def orchestrator(
plan_generation_prompt,
),
schema=OrchestrationPlan,
timeout_override=25,
timeout_override=TF_DR_TIMEOUT_SHORT,
# max_tokens=3000,
)
except Exception as e:
Expand All @@ -368,7 +398,7 @@ def orchestrator(
)

_, _, _ = run_with_timeout(
80,
TF_DR_TIMEOUT_LONG,
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=repeat_plan_prompt,
Expand All @@ -377,7 +407,7 @@ def orchestrator(
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=60,
timeout_override=TF_DR_TIMEOUT_LONG,
answer_piece=StreamingType.REASONING_DELTA.value,
ind=current_step_nr,
),
Expand Down Expand Up @@ -426,7 +456,7 @@ def orchestrator(
decision_prompt,
),
schema=OrchestratorDecisonsNoPlan,
timeout_override=15,
timeout_override=TF_DR_TIMEOUT_LONG,
# max_tokens=1500,
)
next_step = orchestrator_action.next_step
Expand Down Expand Up @@ -460,7 +490,7 @@ def orchestrator(
)

_, _, _ = run_with_timeout(
80,
TF_DR_TIMEOUT_LONG,
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=repeat_reasoning_prompt,
Expand All @@ -469,7 +499,7 @@ def orchestrator(
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=60,
timeout_override=TF_DR_TIMEOUT_LONG,
answer_piece=StreamingType.REASONING_DELTA.value,
ind=current_step_nr,
# max_tokens=None,
Expand All @@ -484,6 +514,9 @@ def orchestrator(

current_step_nr += 1

else:
raise NotImplementedError(f"Research type {research_type} is not implemented.")

base_next_step_purpose_prompt = get_dr_prompt_orchestration_templates(
DRPromptPurpose.NEXT_STEP_PURPOSE,
ResearchType.DEEP,
Expand All @@ -498,48 +531,54 @@ def orchestrator(
)

purpose_tokens: list[str] = [""]
purpose = ""

try:
if research_type in [ResearchType.THOUGHTFUL, ResearchType.DEEP]:

write_custom_event(
current_step_nr,
ReasoningStart(),
writer,
)
try:

purpose_tokens, _, _ = run_with_timeout(
80,
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=create_question_prompt(
decision_system_prompt,
orchestration_next_step_purpose_prompt,
write_custom_event(
current_step_nr,
ReasoningStart(),
writer,
)

purpose_tokens, _, _ = run_with_timeout(
TF_DR_TIMEOUT_LONG,
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=create_question_prompt(
decision_system_prompt,
orchestration_next_step_purpose_prompt,
),
event_name="basic_response",
writer=writer,
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=TF_DR_TIMEOUT_LONG,
answer_piece=StreamingType.REASONING_DELTA.value,
ind=current_step_nr,
# max_tokens=None,
),
event_name="basic_response",
writer=writer,
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=60,
answer_piece=StreamingType.REASONING_DELTA.value,
ind=current_step_nr,
# max_tokens=None,
),
)
)

write_custom_event(
current_step_nr,
SectionEnd(),
writer,
)
write_custom_event(
current_step_nr,
SectionEnd(),
writer,
)

current_step_nr += 1
current_step_nr += 1

except Exception as e:
logger.error("Error in orchestration next step purpose.")
raise e

except Exception as e:
logger.error(f"Error in orchestration next step purpose: {e}")
raise e
purpose = cast(str, merge_content(*purpose_tokens))

purpose = cast(str, merge_content(*purpose_tokens))
elif research_type == ResearchType.FAST:
purpose = f"Answering the question using the {next_tool_name}"

if not next_tool_name:
raise ValueError("The next step has not been defined. This should not happen.")
Expand Down
13 changes: 8 additions & 5 deletions backend/onyx/agents/agent_search/dr/nodes/dr_a2_closer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.agents.agent_search.utils import create_question_prompt
from onyx.chat.chat_utils import llm_doc_from_inference_section
from onyx.configs.agent_configs import TF_DR_TIMEOUT_LONG
from onyx.context.search.models import InferenceSection
from onyx.db.chat import create_search_doc_from_inference_section
from onyx.db.chat import update_db_session_with_messages
Expand Down Expand Up @@ -276,7 +277,7 @@ def closer(
test_info_complete_prompt + (assistant_task_prompt or ""),
),
schema=TestInfoCompleteResponse,
timeout_override=40,
timeout_override=TF_DR_TIMEOUT_LONG,
# max_tokens=1000,
)

Expand Down Expand Up @@ -311,10 +312,12 @@ def closer(
writer,
)

if research_type == ResearchType.THOUGHTFUL:
if research_type in [ResearchType.THOUGHTFUL, ResearchType.FAST]:
final_answer_base_prompt = FINAL_ANSWER_PROMPT_WITHOUT_SUB_ANSWERS
else:
elif research_type == ResearchType.DEEP:
final_answer_base_prompt = FINAL_ANSWER_PROMPT_W_SUB_ANSWERS
else:
raise ValueError(f"Invalid research type: {research_type}")

estimated_final_answer_prompt_tokens = check_number_of_tokens(
final_answer_base_prompt.build(
Expand Down Expand Up @@ -353,7 +356,7 @@ def closer(

try:
streamed_output, _, citation_infos = run_with_timeout(
240,
int(3 * TF_DR_TIMEOUT_LONG),
lambda: stream_llm_answer(
llm=graph_config.tooling.primary_llm,
prompt=create_question_prompt(
Expand All @@ -365,7 +368,7 @@ def closer(
agent_answer_level=0,
agent_answer_question_num=0,
agent_answer_type="agent_level_answer",
timeout_override=60,
timeout_override=int(2 * TF_DR_TIMEOUT_LONG),
answer_piece=StreamingType.MESSAGE_DELTA.value,
ind=current_step_nr,
context_docs=all_context_llmdocs,
Expand Down
Loading
Loading