diff --git a/backend/onyx/agents/agent_search/dr/nodes/dr_a0_clarification.py b/backend/onyx/agents/agent_search/dr/nodes/dr_a0_clarification.py index a9371ea6925..1767ec5cae7 100644 --- a/backend/onyx/agents/agent_search/dr/nodes/dr_a0_clarification.py +++ b/backend/onyx/agents/agent_search/dr/nodes/dr_a0_clarification.py @@ -3,6 +3,7 @@ from typing import Any from typing import cast +from braintrust import traced from langchain_core.messages import HumanMessage from langchain_core.messages import merge_content from langchain_core.runnables import RunnableConfig @@ -22,6 +23,9 @@ from onyx.agents.agent_search.dr.models import DRPromptPurpose from onyx.agents.agent_search.dr.models import OrchestrationClarificationInfo from onyx.agents.agent_search.dr.models import OrchestratorTool +from onyx.agents.agent_search.dr.process_llm_stream import ( + BasicSearchProcessedStreamResults, +) from onyx.agents.agent_search.dr.process_llm_stream import process_llm_stream from onyx.agents.agent_search.dr.states import MainState from onyx.agents.agent_search.dr.states import OrchestrationSetup @@ -666,28 +670,30 @@ def clarifier( system_prompt_to_use = assistant_system_prompt user_prompt_to_use = decision_prompt + assistant_task_prompt - stream = graph_config.tooling.primary_llm.stream( - prompt=create_question_prompt( - cast(str, system_prompt_to_use), - cast(str, user_prompt_to_use), - uploaded_image_context=uploaded_image_context, - ), - tools=([_ARTIFICIAL_ALL_ENCOMPASSING_TOOL]), - tool_choice=(None), - structured_response_format=graph_config.inputs.structured_response_format, - ) - - full_response = process_llm_stream( - messages=stream, - should_stream_answer=True, - writer=writer, - ind=0, - final_search_results=context_llm_docs, - displayed_search_results=context_llm_docs, - generate_final_answer=True, - chat_message_id=str(graph_config.persistence.chat_session_id), - ) + @traced(name="clarifier stream and process", type="llm") + def stream_and_process() -> BasicSearchProcessedStreamResults: + stream = graph_config.tooling.primary_llm.stream( + prompt=create_question_prompt( + cast(str, system_prompt_to_use), + cast(str, user_prompt_to_use), + uploaded_image_context=uploaded_image_context, + ), + tools=([_ARTIFICIAL_ALL_ENCOMPASSING_TOOL]), + tool_choice=(None), + structured_response_format=graph_config.inputs.structured_response_format, + ) + return process_llm_stream( + messages=stream, + should_stream_answer=True, + writer=writer, + ind=0, + final_search_results=context_llm_docs, + displayed_search_results=context_llm_docs, + generate_final_answer=True, + chat_message_id=str(graph_config.persistence.chat_session_id), + ) + full_response = stream_and_process() if len(full_response.ai_message_chunk.tool_calls) == 0: if isinstance(full_response.full_answer, str): diff --git a/backend/onyx/agents/agent_search/orchestration/nodes/call_tool.py b/backend/onyx/agents/agent_search/orchestration/nodes/call_tool.py deleted file mode 100644 index ece6e76b792..00000000000 --- a/backend/onyx/agents/agent_search/orchestration/nodes/call_tool.py +++ /dev/null @@ -1,71 +0,0 @@ -from typing import cast - -from langchain_core.messages import AIMessageChunk -from langchain_core.messages.tool import ToolCall -from langchain_core.runnables.config import RunnableConfig -from langgraph.types import StreamWriter - -from onyx.agents.agent_search.models import GraphConfig -from onyx.agents.agent_search.orchestration.states import ToolCallOutput -from onyx.agents.agent_search.orchestration.states import ToolCallUpdate -from onyx.agents.agent_search.orchestration.states import ToolChoiceUpdate -from onyx.tools.message import build_tool_message -from onyx.tools.message import ToolCallSummary -from onyx.tools.tool_runner import ToolRunner -from onyx.utils.logger import setup_logger - - -logger = setup_logger() - - -class ToolCallException(Exception): - """Exception raised for errors during tool calls.""" - - -def call_tool( - state: ToolChoiceUpdate, - config: RunnableConfig, - writer: StreamWriter = lambda _: None, -) -> ToolCallUpdate: - """Calls the tool specified in the state and updates the state with the result""" - - cast(GraphConfig, config["metadata"]["config"]) - - tool_choice = state.tool_choice - if tool_choice is None: - raise ValueError("Cannot invoke tool call node without a tool choice") - - tool = tool_choice.tool - tool_args = tool_choice.tool_args - tool_id = tool_choice.id - tool_runner = ToolRunner( - tool, tool_args, override_kwargs=tool_choice.search_tool_override_kwargs - ) - tool_kickoff = tool_runner.kickoff() - - try: - tool_responses = [] - for response in tool_runner.tool_responses(): - tool_responses.append(response) - - tool_final_result = tool_runner.tool_final_result() - except Exception as e: - raise ToolCallException( - f"Error during tool call for {tool.display_name}: {e}" - ) from e - - tool_call = ToolCall(name=tool.name, args=tool_args, id=tool_id) - tool_call_summary = ToolCallSummary( - tool_call_request=AIMessageChunk(content="", tool_calls=[tool_call]), - tool_call_result=build_tool_message( - tool_call, tool_runner.tool_message_content() - ), - ) - - tool_call_output = ToolCallOutput( - tool_call_summary=tool_call_summary, - tool_call_kickoff=tool_kickoff, - tool_call_responses=tool_responses, - tool_call_final_result=tool_final_result, - ) - return ToolCallUpdate(tool_call_output=tool_call_output) diff --git a/backend/onyx/agents/agent_search/orchestration/nodes/choose_tool.py b/backend/onyx/agents/agent_search/orchestration/nodes/choose_tool.py deleted file mode 100644 index 3a6b56dccbd..00000000000 --- a/backend/onyx/agents/agent_search/orchestration/nodes/choose_tool.py +++ /dev/null @@ -1,354 +0,0 @@ -from typing import cast -from uuid import uuid4 - -from langchain_core.messages import AIMessage -from langchain_core.messages import HumanMessage -from langchain_core.messages import ToolCall -from langchain_core.runnables.config import RunnableConfig -from langgraph.types import StreamWriter - -from onyx.agents.agent_search.dr.process_llm_stream import process_llm_stream -from onyx.agents.agent_search.models import GraphConfig -from onyx.agents.agent_search.orchestration.states import ToolChoice -from onyx.agents.agent_search.orchestration.states import ToolChoiceState -from onyx.agents.agent_search.orchestration.states import ToolChoiceUpdate -from onyx.agents.agent_search.shared_graph_utils.models import QueryExpansionType -from onyx.chat.prompt_builder.answer_prompt_builder import AnswerPromptBuilder -from onyx.chat.tool_handling.tool_response_handler import get_tool_by_name -from onyx.chat.tool_handling.tool_response_handler import ( - get_tool_call_for_non_tool_calling_llm_impl, -) -from onyx.configs.chat_configs import USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH -from onyx.context.search.preprocessing.preprocessing import query_analysis -from onyx.context.search.retrieval.search_runner import get_query_embedding -from onyx.llm.factory import get_default_llms -from onyx.prompts.chat_prompts import QUERY_KEYWORD_EXPANSION_WITH_HISTORY_PROMPT -from onyx.prompts.chat_prompts import QUERY_KEYWORD_EXPANSION_WITHOUT_HISTORY_PROMPT -from onyx.prompts.chat_prompts import QUERY_SEMANTIC_EXPANSION_WITH_HISTORY_PROMPT -from onyx.prompts.chat_prompts import QUERY_SEMANTIC_EXPANSION_WITHOUT_HISTORY_PROMPT -from onyx.tools.models import QueryExpansions -from onyx.tools.models import SearchToolOverrideKwargs -from onyx.tools.tool import Tool -from onyx.tools.tool_implementations.search.search_tool import SearchTool -from onyx.utils.logger import setup_logger -from onyx.utils.threadpool_concurrency import run_in_background -from onyx.utils.threadpool_concurrency import TimeoutThread -from onyx.utils.threadpool_concurrency import wait_on_background -from onyx.utils.timing import log_function_time -from shared_configs.model_server_models import Embedding - -logger = setup_logger() - - -def _create_history_str(prompt_builder: AnswerPromptBuilder) -> str: - # TODO: Add trimming logic - history_segments = [] - for msg in prompt_builder.message_history: - if isinstance(msg, HumanMessage): - role = "User" - elif isinstance(msg, AIMessage): - role = "Assistant" - else: - continue - history_segments.append(f"{role}:\n {msg.content}\n\n") - return "\n".join(history_segments) - - -def _expand_query( - query: str, - expansion_type: QueryExpansionType, - prompt_builder: AnswerPromptBuilder, -) -> str: - - history_str = _create_history_str(prompt_builder) - - if history_str: - if expansion_type == QueryExpansionType.KEYWORD: - base_prompt = QUERY_KEYWORD_EXPANSION_WITH_HISTORY_PROMPT - else: - base_prompt = QUERY_SEMANTIC_EXPANSION_WITH_HISTORY_PROMPT - expansion_prompt = base_prompt.format(question=query, history=history_str) - else: - if expansion_type == QueryExpansionType.KEYWORD: - base_prompt = QUERY_KEYWORD_EXPANSION_WITHOUT_HISTORY_PROMPT - else: - base_prompt = QUERY_SEMANTIC_EXPANSION_WITHOUT_HISTORY_PROMPT - expansion_prompt = base_prompt.format(question=query) - - msg = HumanMessage(content=expansion_prompt) - primary_llm, _ = get_default_llms() - response = primary_llm.invoke([msg]) - rephrased_query: str = cast(str, response.content) - - return rephrased_query - - -def _expand_query_non_tool_calling_llm( - expanded_keyword_thread: TimeoutThread[str], - expanded_semantic_thread: TimeoutThread[str], -) -> QueryExpansions | None: - keyword_expansion: str | None = wait_on_background(expanded_keyword_thread) - semantic_expansion: str | None = wait_on_background(expanded_semantic_thread) - - if keyword_expansion is None or semantic_expansion is None: - return None - - return QueryExpansions( - keywords_expansions=[keyword_expansion], - semantic_expansions=[semantic_expansion], - ) - - -# TODO: break this out into an implementation function -# and a function that handles extracting the necessary fields -# from the state and config -# TODO: fan-out to multiple tool call nodes? Make this configurable? -@log_function_time(print_only=True) -def choose_tool( - state: ToolChoiceState, - config: RunnableConfig, - writer: StreamWriter = lambda _: None, -) -> ToolChoiceUpdate: - """ - This node is responsible for calling the LLM to choose a tool. If no tool is chosen, - The node MAY emit an answer, depending on whether state["should_stream_answer"] is set. - """ - should_stream_answer = state.should_stream_answer - - agent_config = cast(GraphConfig, config["metadata"]["config"]) - - force_use_tool = agent_config.tooling.force_use_tool - - embedding_thread: TimeoutThread[Embedding] | None = None - keyword_thread: TimeoutThread[tuple[bool, list[str]]] | None = None - expanded_keyword_thread: TimeoutThread[str] | None = None - expanded_semantic_thread: TimeoutThread[str] | None = None - # If we have override_kwargs, add them to the tool_args - override_kwargs: SearchToolOverrideKwargs = ( - force_use_tool.override_kwargs or SearchToolOverrideKwargs() - ) - override_kwargs.original_query = agent_config.inputs.prompt_builder.raw_user_query - - using_tool_calling_llm = agent_config.tooling.using_tool_calling_llm - prompt_builder = state.prompt_snapshot or agent_config.inputs.prompt_builder - - llm = agent_config.tooling.primary_llm - skip_gen_ai_answer_generation = agent_config.behavior.skip_gen_ai_answer_generation - - if ( - not agent_config.behavior.use_agentic_search - and agent_config.tooling.search_tool is not None - and ( - not force_use_tool.force_use or force_use_tool.tool_name == SearchTool._NAME - ) - ): - # Run in a background thread to avoid blocking the main thread - embedding_thread = run_in_background( - get_query_embedding, - agent_config.inputs.prompt_builder.raw_user_query, - agent_config.persistence.db_session, - ) - keyword_thread = run_in_background( - query_analysis, - agent_config.inputs.prompt_builder.raw_user_query, - ) - - if USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH: - - expanded_keyword_thread = run_in_background( - _expand_query, - agent_config.inputs.prompt_builder.raw_user_query, - QueryExpansionType.KEYWORD, - prompt_builder, - ) - expanded_semantic_thread = run_in_background( - _expand_query, - agent_config.inputs.prompt_builder.raw_user_query, - QueryExpansionType.SEMANTIC, - prompt_builder, - ) - - structured_response_format = agent_config.inputs.structured_response_format - tools = [ - tool for tool in (agent_config.tooling.tools or []) if tool.name in state.tools - ] - - tool, tool_args = None, None - if force_use_tool.force_use and force_use_tool.args is not None: - tool_name, tool_args = ( - force_use_tool.tool_name, - force_use_tool.args, - ) - tool = get_tool_by_name(tools, tool_name) - - # special pre-logic for non-tool calling LLM case - elif not using_tool_calling_llm and tools: - chosen_tool_and_args = get_tool_call_for_non_tool_calling_llm_impl( - force_use_tool=force_use_tool, - tools=tools, - prompt_builder=prompt_builder, - llm=llm, - ) - if chosen_tool_and_args: - tool, tool_args = chosen_tool_and_args - - # If we have a tool and tool args, we are ready to request a tool call. - # This only happens if the tool call was forced or we are using a non-tool calling LLM. - if tool and tool_args: - if embedding_thread and tool.name == SearchTool._NAME: - # Wait for the embedding thread to finish - embedding = wait_on_background(embedding_thread) - override_kwargs.precomputed_query_embedding = embedding - if keyword_thread and tool.name == SearchTool._NAME: - is_keyword, keywords = wait_on_background(keyword_thread) - override_kwargs.precomputed_is_keyword = is_keyword - override_kwargs.precomputed_keywords = keywords - # dual keyword expansion needs to be added here for non-tool calling LLM case - if ( - USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH - and expanded_keyword_thread - and expanded_semantic_thread - and tool.name == SearchTool._NAME - ): - override_kwargs.expanded_queries = _expand_query_non_tool_calling_llm( - expanded_keyword_thread=expanded_keyword_thread, - expanded_semantic_thread=expanded_semantic_thread, - ) - if ( - USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH - and tool.name == SearchTool._NAME - and override_kwargs.expanded_queries - ): - if ( - override_kwargs.expanded_queries.keywords_expansions is None - or override_kwargs.expanded_queries.semantic_expansions is None - ): - raise ValueError("No expanded keyword or semantic threads found.") - - return ToolChoiceUpdate( - tool_choice=ToolChoice( - tool=tool, - tool_args=tool_args, - id=str(uuid4()), - search_tool_override_kwargs=override_kwargs, - ), - ) - - # if we're skipping gen ai answer generation, we should only - # continue if we're forcing a tool call (which will be emitted by - # the tool calling llm in the stream() below) - if skip_gen_ai_answer_generation and not force_use_tool.force_use: - return ToolChoiceUpdate( - tool_choice=None, - ) - - built_prompt = ( - prompt_builder.build() - if isinstance(prompt_builder, AnswerPromptBuilder) - else prompt_builder.built_prompt - ) - # At this point, we are either using a tool calling LLM or we are skipping the tool call. - # DEBUG: good breakpoint - stream = llm.stream( - # For tool calling LLMs, we want to insert the task prompt as part of this flow, this is because the LLM - # may choose to not call any tools and just generate the answer, in which case the task prompt is needed. - prompt=built_prompt, - tools=( - [tool.tool_definition() for tool in tools] or None - if using_tool_calling_llm - else None - ), - tool_choice=( - "required" - if tools and force_use_tool.force_use and using_tool_calling_llm - else None - ), - structured_response_format=structured_response_format, - ) - - tool_message = process_llm_stream( - stream, - should_stream_answer - and not agent_config.behavior.skip_gen_ai_answer_generation, - writer, - ind=0, - ).ai_message_chunk - - if tool_message is None: - raise ValueError("No tool message emitted by LLM") - - # If no tool calls are emitted by the LLM, we should not choose a tool - if len(tool_message.tool_calls) == 0: - logger.debug("No tool calls emitted by LLM") - return ToolChoiceUpdate( - tool_choice=None, - ) - - # TODO: here we could handle parallel tool calls. Right now - # we just pick the first one that matches. - selected_tool: Tool | None = None - selected_tool_call_request: ToolCall | None = None - for tool_call_request in tool_message.tool_calls: - known_tools_by_name = [ - tool for tool in tools if tool.name == tool_call_request["name"] - ] - - if known_tools_by_name: - selected_tool = known_tools_by_name[0] - selected_tool_call_request = tool_call_request - break - - logger.error( - "Tool call requested with unknown name field. \n" - f"tools: {tools}" - f"tool_call_request: {tool_call_request}" - ) - - if not selected_tool or not selected_tool_call_request: - raise ValueError( - f"Tool call attempted with tool {selected_tool}, request {selected_tool_call_request}" - ) - - logger.debug(f"Selected tool: {selected_tool.name}") - logger.debug(f"Selected tool call request: {selected_tool_call_request}") - - if embedding_thread and selected_tool.name == SearchTool._NAME: - # Wait for the embedding thread to finish - embedding = wait_on_background(embedding_thread) - override_kwargs.precomputed_query_embedding = embedding - if keyword_thread and selected_tool.name == SearchTool._NAME: - is_keyword, keywords = wait_on_background(keyword_thread) - override_kwargs.precomputed_is_keyword = is_keyword - override_kwargs.precomputed_keywords = keywords - - if ( - selected_tool.name == SearchTool._NAME - and expanded_keyword_thread - and expanded_semantic_thread - ): - - override_kwargs.expanded_queries = _expand_query_non_tool_calling_llm( - expanded_keyword_thread=expanded_keyword_thread, - expanded_semantic_thread=expanded_semantic_thread, - ) - if ( - USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH - and selected_tool.name == SearchTool._NAME - and override_kwargs.expanded_queries - ): - # TODO: this is a hack to handle the case where the expanded queries are not found. - # We should refactor this to be more robust. - if ( - override_kwargs.expanded_queries.keywords_expansions is None - or override_kwargs.expanded_queries.semantic_expansions is None - ): - raise ValueError("No expanded keyword or semantic threads found.") - - return ToolChoiceUpdate( - tool_choice=ToolChoice( - tool=selected_tool, - tool_args=selected_tool_call_request["args"], - id=selected_tool_call_request["id"], - search_tool_override_kwargs=override_kwargs, - ), - ) diff --git a/backend/onyx/agents/agent_search/orchestration/nodes/prepare_tool_input.py b/backend/onyx/agents/agent_search/orchestration/nodes/prepare_tool_input.py deleted file mode 100644 index cd694e07d19..00000000000 --- a/backend/onyx/agents/agent_search/orchestration/nodes/prepare_tool_input.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Any -from typing import cast - -from langchain_core.runnables.config import RunnableConfig - -from onyx.agents.agent_search.models import GraphConfig -from onyx.agents.agent_search.orchestration.states import ToolChoiceInput - - -def prepare_tool_input(state: Any, config: RunnableConfig) -> ToolChoiceInput: - agent_config = cast(GraphConfig, config["metadata"]["config"]) - return ToolChoiceInput( - # NOTE: this node is used at the top level of the agent, so we always stream - should_stream_answer=True, - prompt_snapshot=None, # uses default prompt builder - tools=[tool.name for tool in (agent_config.tooling.tools or [])], - ) diff --git a/backend/onyx/agents/agent_search/shared_graph_utils/llm.py b/backend/onyx/agents/agent_search/shared_graph_utils/llm.py index 7210312bf53..6c351c5689e 100644 --- a/backend/onyx/agents/agent_search/shared_graph_utils/llm.py +++ b/backend/onyx/agents/agent_search/shared_graph_utils/llm.py @@ -5,6 +5,7 @@ from typing import Type from typing import TypeVar +from braintrust import traced from langchain.schema.language_model import LanguageModelInput from langchain_core.messages import HumanMessage from langgraph.types import StreamWriter @@ -27,6 +28,7 @@ JSON_PATTERN = re.compile(r"```(?:json)?\s*(\{.*?\})\s*```", re.DOTALL) +@traced(name="stream llm", type="llm") def stream_llm_answer( llm: LLM, prompt: LanguageModelInput, diff --git a/backend/onyx/chat/process_message.py b/backend/onyx/chat/process_message.py index c6542f35251..cded1a075cd 100644 --- a/backend/onyx/chat/process_message.py +++ b/backend/onyx/chat/process_message.py @@ -9,7 +9,6 @@ from sqlalchemy.orm import Session -from onyx.agents.agent_search.orchestration.nodes.call_tool import ToolCallException from onyx.chat.answer import Answer from onyx.chat.chat_utils import create_chat_chain from onyx.chat.chat_utils import create_temporary_persona @@ -113,6 +112,10 @@ ERROR_TYPE_CANCELLED = "cancelled" +class ToolCallException(Exception): + """Exception raised for errors during tool calls.""" + + class PartialResponse(Protocol): def __call__( self, diff --git a/backend/onyx/llm/interfaces.py b/backend/onyx/llm/interfaces.py index a4290b929ac..357a04670b4 100644 --- a/backend/onyx/llm/interfaces.py +++ b/backend/onyx/llm/interfaces.py @@ -121,7 +121,6 @@ def _invoke_implementation( ) -> BaseMessage: raise NotImplementedError - @traced(name="stream llm", type="llm") def stream( self, prompt: LanguageModelInput, diff --git a/backend/onyx/secondary_llm_flows/query_validation.py b/backend/onyx/secondary_llm_flows/query_validation.py deleted file mode 100644 index 514c2d5e103..00000000000 --- a/backend/onyx/secondary_llm_flows/query_validation.py +++ /dev/null @@ -1,136 +0,0 @@ -# NOTE No longer used. This needs to be revisited later. -import re -from collections.abc import Iterator - -from onyx.chat.models import OnyxAnswerPiece -from onyx.chat.models import StreamingError -from onyx.llm.exceptions import GenAIDisabledException -from onyx.llm.factory import get_default_llms -from onyx.llm.utils import dict_based_prompt_to_langchain_prompt -from onyx.llm.utils import message_generator_to_string_generator -from onyx.llm.utils import message_to_string -from onyx.prompts.constants import ANSWERABLE_PAT -from onyx.prompts.constants import THOUGHT_PAT -from onyx.prompts.query_validation import ANSWERABLE_PROMPT -from onyx.server.query_and_chat.models import QueryValidationResponse -from onyx.server.utils import get_json_line -from onyx.utils.logger import setup_logger - -logger = setup_logger() - - -def get_query_validation_messages(user_query: str) -> list[dict[str, str]]: - messages = [ - { - "role": "user", - "content": ANSWERABLE_PROMPT.format(user_query=user_query), - }, - ] - - return messages - - -def extract_answerability_reasoning(model_raw: str) -> str: - reasoning_match = re.search( - f"{THOUGHT_PAT.upper()}(.*?){ANSWERABLE_PAT.upper()}", model_raw, re.DOTALL - ) - reasoning_text = reasoning_match.group(1).strip() if reasoning_match else "" - return reasoning_text - - -def extract_answerability_bool(model_raw: str) -> bool: - answerable_match = re.search(f"{ANSWERABLE_PAT.upper()}(.+)", model_raw) - answerable_text = answerable_match.group(1).strip() if answerable_match else "" - answerable = True if answerable_text.strip().lower() in ["true", "yes"] else False - return answerable - - -def get_query_answerability( - user_query: str, skip_check: bool = False -) -> tuple[str, bool]: - if skip_check: - return "Query Answerability Evaluation feature is turned off", True - - try: - llm, _ = get_default_llms() - except GenAIDisabledException: - return "Generative AI is turned off - skipping check", True - - messages = get_query_validation_messages(user_query) - filled_llm_prompt = dict_based_prompt_to_langchain_prompt(messages) - model_output = message_to_string(llm.invoke(filled_llm_prompt)) - - reasoning = extract_answerability_reasoning(model_output) - answerable = extract_answerability_bool(model_output) - - return reasoning, answerable - - -def stream_query_answerability( - user_query: str, skip_check: bool = False -) -> Iterator[str]: - if skip_check: - yield get_json_line( - QueryValidationResponse( - reasoning="Query Answerability Evaluation feature is turned off", - answerable=True, - ).model_dump() - ) - return - - try: - llm, _ = get_default_llms() - except GenAIDisabledException: - yield get_json_line( - QueryValidationResponse( - reasoning="Generative AI is turned off - skipping check", - answerable=True, - ).model_dump() - ) - return - messages = get_query_validation_messages(user_query) - filled_llm_prompt = dict_based_prompt_to_langchain_prompt(messages) - try: - tokens = message_generator_to_string_generator(llm.stream(filled_llm_prompt)) - reasoning_pat_found = False - model_output = "" - hold_answerable = "" - for token in tokens: - model_output = model_output + token - - if ANSWERABLE_PAT.upper() in model_output: - continue - - if not reasoning_pat_found and THOUGHT_PAT.upper() in model_output: - reasoning_pat_found = True - reason_ind = model_output.find(THOUGHT_PAT.upper()) - remaining = model_output[reason_ind + len(THOUGHT_PAT.upper()) :] - if remaining: - yield get_json_line( - OnyxAnswerPiece(answer_piece=remaining).model_dump() - ) - continue - - if reasoning_pat_found: - hold_answerable = hold_answerable + token - if hold_answerable == ANSWERABLE_PAT.upper()[: len(hold_answerable)]: - continue - yield get_json_line( - OnyxAnswerPiece(answer_piece=hold_answerable).model_dump() - ) - hold_answerable = "" - - reasoning = extract_answerability_reasoning(model_output) - answerable = extract_answerability_bool(model_output) - - yield get_json_line( - QueryValidationResponse( - reasoning=reasoning, answerable=answerable - ).model_dump() - ) - except Exception as e: - # exception is logged in the answer_question method, no need to re-log - error = StreamingError(error=str(e)) - yield get_json_line(error.model_dump()) - logger.exception("Failed to validate Query") - return