Skip to content

Streaming!!!! #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
cf7efa1
wip on simple streaming
ewilliams-cloudera May 2, 2025
46da524
simple poc for streaming
ewilliams-cloudera May 5, 2025
52904cb
remove usage from RagChatQueryInput.tsx
ewilliams-cloudera May 5, 2025
5ff5da8
remove stream hypothetical
ewilliams-cloudera May 5, 2025
96be99a
remove unused import
ewilliams-cloudera May 5, 2025
4e8e6f0
wip on doing something once the gen is done
ewilliams-cloudera May 5, 2025
9cb719f
progress on generators
ewilliams-cloudera May 5, 2025
8c25d44
go back to simple streaming only endpoint
ewilliams-cloudera May 5, 2025
d37f50f
Merge remote-tracking branch 'origin/main' into mob/main
jkwatson May 5, 2025
e25408d
wip
jkwatson May 5, 2025
1bd9f5f
add response id on every chunk returned
jkwatson May 5, 2025
4e9b8ef
remove duplicate calls, but still not rendering
jkwatson May 5, 2025
3b91522
getting there
ewilliams-cloudera May 6, 2025
df0203a
Consolidate response_id generation
mliu-cloudera May 6, 2025
cb81801
wip
jkwatson May 6, 2025
469d14b
drop databases
mliu-cloudera May 6, 2025
9c5a008
mob next [ci-skip] [ci skip] [skip ci]
baasitsharief May 6, 2025
29003ee
mob next [ci-skip] [ci skip] [skip ci]
baasitsharief May 6, 2025
603ae73
small refactor
ewilliams-cloudera May 6, 2025
8984ddb
remove deps
ewilliams-cloudera May 6, 2025
cfae208
things are getting close
ewilliams-cloudera May 6, 2025
0a286bc
wip
jkwatson May 6, 2025
6c47c6b
drop databases
mliu-cloudera May 6, 2025
2cf8003
wip
ewilliams-cloudera May 6, 2025
5292878
mob next [ci-skip] [ci skip] [skip ci]
baasitsharief May 6, 2025
c89bebd
drop databases
mliu-cloudera May 6, 2025
d5cca0f
wip
ewilliams-cloudera May 6, 2025
476634f
fixing scrolling
ewilliams-cloudera May 6, 2025
8c8f1a8
only show loading nodes if kb
ewilliams-cloudera May 6, 2025
a06c030
remove unused
ewilliams-cloudera May 6, 2025
4e0fc1f
removing active loading state
ewilliams-cloudera May 6, 2025
9b4087b
fix mypy issues
jkwatson May 6, 2025
a66d489
ruff
jkwatson May 6, 2025
feacd5a
Update release version to dev-testing
actions-user May 6, 2025
87fd117
handle file not found error for summaries when local
ewilliams-cloudera May 6, 2025
3cecc22
remove log
ewilliams-cloudera May 6, 2025
71f5d6c
renaming
ewilliams-cloudera May 7, 2025
6612fa5
better error handling
ewilliams-cloudera May 7, 2025
65ca831
bump bedrock to use max tokens of 1024
ewilliams-cloudera May 7, 2025
14ad9ae
python refactoring
jkwatson May 7, 2025
dd36cc0
mob next [ci-skip] [ci skip] [skip ci]
baasitsharief May 7, 2025
cd960a1
nits
ewilliams-cloudera May 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions llm-service/app/ai/indexing/summary_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,17 @@ def create_storage_context(
@classmethod
def get_all_data_source_summaries(cls) -> dict[str, str]:
root_dir = cls.__persist_root_dir()
# if not os.path.exists(root_dir):
# return {}
storage_context = SummaryIndexer.create_storage_context(
persist_dir=root_dir,
vector_store=SimpleVectorStore(),
)
indices = load_indices_from_storage(storage_context=storage_context, index_ids=None,
try:
storage_context = SummaryIndexer.create_storage_context(
persist_dir=root_dir,
vector_store=SimpleVectorStore(),
)
except FileNotFoundError:
# If the directory doesn't exist, we don't have any summaries.
return {}
indices = load_indices_from_storage(
storage_context=storage_context,
index_ids=None,
**{
"llm": models.LLM.get_noop(),
"response_synthesizer": models.LLM.get_noop(),
Expand All @@ -234,11 +238,13 @@ def get_all_data_source_summaries(cls) -> dict[str, str]:
"summary_query": "None",
"data_source_id": 0,
},
)
)
if len(indices) == 0:
return {}

global_summary_store: DocumentSummaryIndex = cast(DocumentSummaryIndex, indices[0])
global_summary_store: DocumentSummaryIndex = cast(
DocumentSummaryIndex, indices[0]
)

summary_ids = global_summary_store.index_struct.doc_id_to_summary_id.values()
nodes = global_summary_store.docstore.get_nodes(list(summary_ids))
Expand Down
2 changes: 1 addition & 1 deletion llm-service/app/routers/index/chat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from pydantic import BaseModel

from app import exceptions
from app.services.chat import generate_suggested_questions
from app.services.chat.suggested_questions import generate_suggested_questions

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/chat", tags=["Chat"])
Expand Down
64 changes: 59 additions & 5 deletions llm-service/app/routers/index/sessions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@
import base64
import json
import logging
from typing import Optional
from typing import Optional, Generator

from fastapi import APIRouter, Header
from fastapi import APIRouter, Header, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from app.services.chat.streaming_chat import stream_chat
from .... import exceptions
from ....rag_types import RagPredictConfiguration
from ....services.chat import (
v2_chat,
from ....services.chat.chat import (
chat as run_chat,
)
from ....services.chat_history.chat_history_manager import (
RagStudioChatMessage,
Expand Down Expand Up @@ -100,6 +102,24 @@ def chat_history(
)


@router.get(
"/chat-history/{message_id}",
summary="Returns a specific chat messages for the provided session.",
)
@exceptions.propagates
def get_message_by_id(session_id: int, message_id: str) -> RagStudioChatMessage:
results: list[RagStudioChatMessage] = chat_history_manager.retrieve_chat_history(
session_id=session_id
)
for message in results:
if message.id == message_id:
return message
raise HTTPException(
status_code=404,
detail=f"Message with id {message_id} not found in session {session_id}",
)


@router.delete(
"/chat-history", summary="Deletes the chat history for the provided session."
)
Expand Down Expand Up @@ -161,6 +181,10 @@ class RagStudioChatRequest(BaseModel):
configuration: RagPredictConfiguration | None = None


class StreamCompletionRequest(BaseModel):
query: str


def parse_jwt_cookie(jwt_cookie: str | None) -> str:
if jwt_cookie is None:
return "unknown"
Expand All @@ -187,4 +211,34 @@ def chat(
session = session_metadata_api.get_session(session_id, user_name=origin_remote_user)

configuration = request.configuration or RagPredictConfiguration()
return v2_chat(session, request.query, configuration, user_name=origin_remote_user)
return run_chat(session, request.query, configuration, user_name=origin_remote_user)


@router.post(
"/stream-completion", summary="Stream completion responses for the given query"
)
@exceptions.propagates
def stream_chat_completion(
session_id: int,
request: RagStudioChatRequest,
origin_remote_user: Optional[str] = Header(None),
) -> StreamingResponse:
session = session_metadata_api.get_session(session_id, user_name=origin_remote_user)
configuration = request.configuration or RagPredictConfiguration()

def generate_stream() -> Generator[str, None, None]:
response_id: str = ""
try:
for response in stream_chat(
session, request.query, configuration, user_name=origin_remote_user
):
print(response)
response_id = response.additional_kwargs["response_id"]
json_delta = json.dumps({"text": response.delta})
yield f"data: {json_delta}" + "\n\n"
yield f'data: {{"response_id" : "{response_id}"}}\n\n'
except Exception as e:
logger.exception("Failed to stream chat completion")
yield f'data: {{"error" : "{e}"}}\n\n'

return StreamingResponse(generate_stream(), media_type="text/event-stream")
38 changes: 38 additions & 0 deletions llm-service/app/services/chat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
# (C) Cloudera, Inc. 2025
# All rights reserved.
#
# Applicable Open Source License: Apache 2.0
#
# NOTE: Cloudera open source products are modular software products
# made up of hundreds of individual components, each of which was
# individually copyrighted. Each Cloudera open source product is a
# collective work under U.S. Copyright Law. Your license to use the
# collective work is as provided in your written agreement with
# Cloudera. Used apart from the collective work, this file is
# licensed for your use pursuant to the open source license
# identified above.
#
# This code is provided to you pursuant a written agreement with
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
# this code. If you do not have a written agreement with Cloudera nor
# with an authorized and properly licensed third party, you do not
# have any rights to access nor to use this code.
#
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
#

171 changes: 171 additions & 0 deletions llm-service/app/services/chat/chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
# (C) Cloudera, Inc. 2025
# All rights reserved.
#
# Applicable Open Source License: Apache 2.0
#
# NOTE: Cloudera open source products are modular software products
# made up of hundreds of individual components, each of which was
# individually copyrighted. Each Cloudera open source product is a
# collective work under U.S. Copyright Law. Your license to use the
# collective work is as provided in your written agreement with
# Cloudera. Used apart from the collective work, this file is
# licensed for your use pursuant to the open source license
# identified above.
#
# This code is provided to you pursuant a written agreement with
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
# this code. If you do not have a written agreement with Cloudera nor
# with an authorized and properly licensed third party, you do not
# have any rights to access nor to use this code.
#
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
#

import time
import uuid
from typing import Optional

from fastapi import HTTPException

from app.services import evaluators, llm_completion
from app.services.chat.utils import retrieve_chat_history, format_source_nodes
from app.services.chat_history.chat_history_manager import (
Evaluation,
RagMessage,
RagStudioChatMessage,
chat_history_manager,
)
from app.services.metadata_apis.session_metadata_api import Session
from app.services.mlflow import record_rag_mlflow_run, record_direct_llm_mlflow_run
from app.services.query import querier
from app.services.query.query_configuration import QueryConfiguration
from app.ai.vector_stores.vector_store_factory import VectorStoreFactory
from app.rag_types import RagPredictConfiguration


def chat(
session: Session,
query: str,
configuration: RagPredictConfiguration,
user_name: Optional[str],
) -> RagStudioChatMessage:
query_configuration = QueryConfiguration(
top_k=session.response_chunks,
model_name=session.inference_model,
rerank_model_name=session.rerank_model,
exclude_knowledge_base=configuration.exclude_knowledge_base,
use_question_condensing=configuration.use_question_condensing,
use_hyde=session.query_configuration.enable_hyde,
use_summary_filter=session.query_configuration.enable_summary_filter,
)

response_id = str(uuid.uuid4())

if configuration.exclude_knowledge_base or len(session.data_source_ids) == 0:
return direct_llm_chat(session, response_id, query, user_name)

total_data_sources_size: int = sum(
map(
lambda ds_id: VectorStoreFactory.for_chunks(ds_id).size() or 0,
session.data_source_ids,
)
)
if total_data_sources_size == 0:
return direct_llm_chat(session, response_id, query, user_name)

new_chat_message: RagStudioChatMessage = _run_chat(
session, response_id, query, query_configuration, user_name
)

chat_history_manager.append_to_history(session.id, [new_chat_message])
return new_chat_message


def _run_chat(
session: Session,
response_id: str,
query: str,
query_configuration: QueryConfiguration,
user_name: Optional[str],
) -> RagStudioChatMessage:
if len(session.data_source_ids) != 1:
raise HTTPException(
status_code=400, detail="Only one datasource is supported for chat."
)

data_source_id: int = session.data_source_ids[0]
response, condensed_question = querier.query(
data_source_id,
query,
query_configuration,
retrieve_chat_history(session.id),
)
if condensed_question and (condensed_question.strip() == query.strip()):
condensed_question = None
relevance, faithfulness = evaluators.evaluate_response(
query, response, session.inference_model
)
response_source_nodes = format_source_nodes(response, data_source_id)
new_chat_message = RagStudioChatMessage(
id=response_id,
session_id=session.id,
source_nodes=response_source_nodes,
inference_model=session.inference_model,
rag_message=RagMessage(
user=query,
assistant=response.response,
),
evaluations=[
Evaluation(name="relevance", value=relevance),
Evaluation(name="faithfulness", value=faithfulness),
],
timestamp=time.time(),
condensed_question=condensed_question,
)

record_rag_mlflow_run(
new_chat_message, query_configuration, response_id, session, user_name
)
return new_chat_message


def direct_llm_chat(
session: Session, response_id: str, query: str, user_name: Optional[str]
) -> RagStudioChatMessage:
record_direct_llm_mlflow_run(response_id, session, user_name)

chat_response = llm_completion.completion(
session.id, query, session.inference_model
)
new_chat_message = RagStudioChatMessage(
id=response_id,
session_id=session.id,
source_nodes=[],
inference_model=session.inference_model,
evaluations=[],
rag_message=RagMessage(
user=query,
assistant=str(chat_response.message.content),
),
timestamp=time.time(),
condensed_question=None,
)
chat_history_manager.append_to_history(session.id, [new_chat_message])
return new_chat_message


Loading
Loading