From 3c1642f6672d9edc937aa59964fe74ba7b3dfb35 Mon Sep 17 00:00:00 2001 From: Rei Meguro <36625832+Orbital-Web@users.noreply.github.com> Date: Wed, 11 Jun 2025 13:13:43 -0700 Subject: [PATCH 1/3] fix: answer streaming --- .../agent_search/kb_search/graph_utils.py | 25 ------ .../kb_search/nodes/d1_generate_answer.py | 90 +++---------------- 2 files changed, 13 insertions(+), 102 deletions(-) diff --git a/backend/onyx/agents/agent_search/kb_search/graph_utils.py b/backend/onyx/agents/agent_search/kb_search/graph_utils.py index 7b59e3a2c2d..460ea4926a2 100644 --- a/backend/onyx/agents/agent_search/kb_search/graph_utils.py +++ b/backend/onyx/agents/agent_search/kb_search/graph_utils.py @@ -217,31 +217,6 @@ def stream_write_close_steps(writer: StreamWriter, level: int = 0) -> None: write_custom_event("stream_finished", stop_event, writer) -def stream_write_close_main_answer(writer: StreamWriter, level: int = 0) -> None: - stop_event = StreamStopInfo( - stop_reason=StreamStopReason.FINISHED, - stream_type=StreamType.MAIN_ANSWER, - level=level, - level_question_num=0, - ) - write_custom_event("stream_finished", stop_event, writer) - - -def stream_write_main_answer_token( - writer: StreamWriter, token: str, level: int = 0, level_question_num: int = 0 -) -> None: - write_custom_event( - "initial_agent_answer", - AgentAnswerPiece( - answer_piece=token, # No need to add space as tokenizer handles this - level=level, - level_question_num=level_question_num, - answer_type="agent_level_answer", - ), - writer, - ) - - def get_doc_information_for_entity(entity_id_name: str) -> KGEntityDocInfo: """ Get document information for an entity, including its semantic name and document details. diff --git a/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py b/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py index cc2dbc91709..43b0df91ae3 100644 --- a/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py +++ b/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py @@ -7,13 +7,7 @@ from onyx.access.access import get_acl_for_user from onyx.agents.agent_search.kb_search.graph_utils import rename_entities_in_answer -from onyx.agents.agent_search.kb_search.graph_utils import ( - stream_write_close_main_answer, -) from onyx.agents.agent_search.kb_search.graph_utils import stream_write_close_steps -from onyx.agents.agent_search.kb_search.graph_utils import ( - stream_write_main_answer_token, -) from onyx.agents.agent_search.kb_search.ops import research from onyx.agents.agent_search.kb_search.states import MainOutput from onyx.agents.agent_search.kb_search.states import MainState @@ -21,6 +15,7 @@ from onyx.agents.agent_search.shared_graph_utils.calculations import ( get_answer_generation_documents, ) +from onyx.agents.agent_search.shared_graph_utils.llm import stream_llm_answer from onyx.agents.agent_search.shared_graph_utils.utils import ( get_langgraph_node_log_string, ) @@ -32,8 +27,6 @@ from onyx.context.search.enums import SearchType from onyx.context.search.models import InferenceSection from onyx.db.engine import get_session_with_current_tenant -from onyx.natural_language_processing.utils import BaseTokenizer -from onyx.natural_language_processing.utils import get_tokenizer from onyx.prompts.kg_prompts import OUTPUT_FORMAT_NO_EXAMPLES_PROMPT from onyx.prompts.kg_prompts import OUTPUT_FORMAT_NO_OVERALL_ANSWER_PROMPT from onyx.tools.tool_implementations.search.search_tool import IndexFilters @@ -45,17 +38,6 @@ logger = setup_logger() -def _stream_augmentations( - llm_tokenizer: BaseTokenizer, streaming_text: str, writer: StreamWriter -) -> None: - - # Tokenize and stream the reference results - tokens = llm_tokenizer.tokenize(streaming_text) - for token in tokens: - - stream_write_main_answer_token(writer, token) - - def generate_answer( state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None ) -> MainOutput: @@ -221,70 +203,24 @@ def generate_answer( content=output_format_prompt, ) ] - fast_llm = graph_config.tooling.fast_llm - - dispatch_timings: list[float] = [] - response: list[str] = [] - - def stream_answer() -> list[str]: - # Get the LLM's tokenizer - llm_tokenizer = get_tokenizer( - model_name=fast_llm.config.model_name, - provider_type=fast_llm.config.model_provider, - ) - - for message in fast_llm.stream( - prompt=msg, - timeout_override=30, - max_tokens=1000, - ): - # TODO: in principle, the answer here COULD contain images, but we don't support that yet - content = message.content - if not isinstance(content, str): - raise ValueError( - f"Expected content to be a string, but got {type(content)}" - ) - - # Tokenize the content using the LLM's tokenizer - tokens = llm_tokenizer.tokenize(content) - for token in tokens: - start_stream_token = datetime.now() - stream_write_main_answer_token( - writer, token, level=0, level_question_num=0 - ) - end_stream_token = datetime.now() - dispatch_timings.append( - (end_stream_token - start_stream_token).microseconds - ) - response.append(token) - return response - try: - response = run_with_timeout( + run_with_timeout( KG_ANSWER_GENERATION_TIMEOUT, - stream_answer, + lambda: stream_llm_answer( + llm=graph_config.tooling.fast_llm, + prompt=msg, + event_name="initial_agent_answer", + writer=writer, + agent_answer_level=0, + agent_answer_question_num=0, + agent_answer_type="agent_level_answer", + timeout_override=30, + max_tokens=1000, + ), ) - - # llm_tokenizer = get_tokenizer( - # model_name=fast_llm.config.model_name, - # provider_type=fast_llm.config.model_provider, - # ) - - # TODO: the fake streaming should happen in friont-end. Revisit and then - # simply stream out here the full text in one. - # if reference_results_str: - # # Get the LLM's tokenizer - - # _stream_augmentations(llm_tokenizer, reference_results_str, writer) - - # if state.remarks: - # _stream_augmentations(llm_tokenizer, "Comments: \n " + "\n".join(state.remarks), writer) - except Exception as e: raise ValueError(f"Could not generate the answer. Error {e}") - stream_write_close_main_answer(writer) - return MainOutput( log_messages=[ get_langgraph_node_log_string( From 0faf6706e0ca026fa31b7d545590209f0d576c52 Mon Sep 17 00:00:00 2001 From: Rei Meguro <36625832+Orbital-Web@users.noreply.github.com> Date: Wed, 11 Jun 2025 14:07:24 -0700 Subject: [PATCH 2/3] fix: env vars --- .../kb_search/nodes/d1_generate_answer.py | 10 ++++++---- backend/onyx/configs/kg_configs.py | 16 ++++++++++++++-- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py b/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py index 43b0df91ae3..f6c32f257bc 100644 --- a/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py +++ b/backend/onyx/agents/agent_search/kb_search/nodes/d1_generate_answer.py @@ -22,8 +22,10 @@ from onyx.agents.agent_search.shared_graph_utils.utils import relevance_from_docs from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event from onyx.chat.models import ExtendedToolResponse -from onyx.configs.kg_configs import KG_ANSWER_GENERATION_TIMEOUT +from onyx.configs.kg_configs import KG_MAX_TOKENS_ANSWER_GENERATION from onyx.configs.kg_configs import KG_RESEARCH_NUM_RETRIEVED_DOCS +from onyx.configs.kg_configs import KG_TIMEOUT_CONNECT_LLM_INITIAL_ANSWER_GENERATION +from onyx.configs.kg_configs import KG_TIMEOUT_LLM_INITIAL_ANSWER_GENERATION from onyx.context.search.enums import SearchType from onyx.context.search.models import InferenceSection from onyx.db.engine import get_session_with_current_tenant @@ -205,7 +207,7 @@ def generate_answer( ] try: run_with_timeout( - KG_ANSWER_GENERATION_TIMEOUT, + KG_TIMEOUT_LLM_INITIAL_ANSWER_GENERATION, lambda: stream_llm_answer( llm=graph_config.tooling.fast_llm, prompt=msg, @@ -214,8 +216,8 @@ def generate_answer( agent_answer_level=0, agent_answer_question_num=0, agent_answer_type="agent_level_answer", - timeout_override=30, - max_tokens=1000, + timeout_override=KG_TIMEOUT_CONNECT_LLM_INITIAL_ANSWER_GENERATION, + max_tokens=KG_MAX_TOKENS_ANSWER_GENERATION, ), ) except Exception as e: diff --git a/backend/onyx/configs/kg_configs.py b/backend/onyx/configs/kg_configs.py index 73678ae7753..ba24931a20a 100644 --- a/backend/onyx/configs/kg_configs.py +++ b/backend/onyx/configs/kg_configs.py @@ -42,8 +42,20 @@ os.environ.get("KG_OBJECT_SOURCE_RESEARCH_TIMEOUT", "30") ) -KG_ANSWER_GENERATION_TIMEOUT: int = int( - os.environ.get("KG_ANSWER_GENERATION_TIMEOUT", "30") +KG_TIMEOUT_LLM_INITIAL_ANSWER_GENERATION: int = int( + os.environ.get("KG_TIMEOUT_LLM_INITIAL_ANSWER_GENERATION", "45") +) + +KG_TIMEOUT_CONNECT_LLM_INITIAL_ANSWER_GENERATION: int = int( + os.environ.get("KG_TIMEOUT_CONNECT_LLM_INITIAL_ANSWER_GENERATION", "15") +) + +KG_MAX_TOKENS_ANSWER_GENERATION: int = int( + os.environ.get("KG_MAX_TOKENS_ANSWER_GENERATION", "1024") +) + +KG_MAX_TOKENS_ANSWER_GENERATION: int = int( + os.environ.get("KG_MAX_TOKENS_ANSWER_GENERATION", "1024") ) KG_MAX_DEEP_SEARCH_RESULTS: int = int( From c4fcc8e41e009f8032b88208ab1321fefb5fbed2 Mon Sep 17 00:00:00 2001 From: Rei Meguro <36625832+Orbital-Web@users.noreply.github.com> Date: Wed, 11 Jun 2025 14:24:50 -0700 Subject: [PATCH 3/3] fix: remove duplicate --- backend/onyx/configs/kg_configs.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/onyx/configs/kg_configs.py b/backend/onyx/configs/kg_configs.py index ba24931a20a..781fc1c5851 100644 --- a/backend/onyx/configs/kg_configs.py +++ b/backend/onyx/configs/kg_configs.py @@ -54,10 +54,6 @@ os.environ.get("KG_MAX_TOKENS_ANSWER_GENERATION", "1024") ) -KG_MAX_TOKENS_ANSWER_GENERATION: int = int( - os.environ.get("KG_MAX_TOKENS_ANSWER_GENERATION", "1024") -) - KG_MAX_DEEP_SEARCH_RESULTS: int = int( os.environ.get("KG_MAX_DEEP_SEARCH_RESULTS", "30") )