Skip to content

Commit 2ea29a4

Browse files
lievanlievan
authored andcommitted
fix(llmobs): bedrock converse tracing is resistant to modifying the stream (#13659)
Fixes an issue where modifying chunks returned from a bedrock stream impacted our tracing of those chunks. Instead, our tracing should reflect the original response returned regardless of whether or not it was modified. This is relevant when libraries like langchain delete data from the raw streamed response (e.g. [popping](https://github.com/langchain-ai/langchain-aws/blob/40abb584979a349019d89bbf1cba7d8c56d23664/libs/aws/langchain_aws/chat_models/bedrock_converse.py#L995) `usageMetadata`). The fix uses an approach where we immediately process stream chunks as they are iterated over, instead of waiting until the entire stream has finished. The data flow is like this: 1. a streamed chunk is read from `TracedBotocoreConverseStream` 2. we send that chunk to `_output_stream_processor`, which reads all the relevant data from that chunk and builds the final output messages, token usage, metadata. This should block until we reach the next yield, at which point we've read all the data we needed for this chunk. The parsing logic is **unchanged** from the previous helper we used to parse the stream stream, except this method is now a generator. 4. we yield the chunk back to the user In terms of manual testing, i've verified that it fixes the langchain x bedrock converse streaming issue <img width="908" alt="image" src="https://github.com/user-attachments/assets/e4d1e4e5-3d1c-4f2d-a822-f7c7cdb169a6" /> if this logic looks good, it may be a pattern we will want to implement for our other integrations as well ## Checklist - [ ] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: lievan <evan.li@datadoqhq.com>
1 parent 4f1d6f1 commit 2ea29a4

File tree

4 files changed

+87
-13
lines changed

4 files changed

+87
-13
lines changed

ddtrace/contrib/internal/botocore/services/bedrock.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,15 @@ class TracedBotocoreConverseStream(wrapt.ObjectProxy):
111111

112112
def __init__(self, wrapped, ctx: core.ExecutionContext):
113113
super().__init__(wrapped)
114-
self._stream_chunks = []
115114
self._execution_ctx = ctx
116115

117116
def __iter__(self):
117+
stream_processor = self._execution_ctx["bedrock_integration"]._converse_output_stream_processor()
118118
exception_raised = False
119119
try:
120+
next(stream_processor)
120121
for chunk in self.__wrapped__:
121-
self._stream_chunks.append(chunk)
122+
stream_processor.send(chunk)
122123
yield chunk
123124
except Exception:
124125
core.dispatch("botocore.patched_bedrock_api_call.exception", [self._execution_ctx, sys.exc_info()])
@@ -127,7 +128,7 @@ def __iter__(self):
127128
finally:
128129
if exception_raised:
129130
return
130-
core.dispatch("botocore.bedrock.process_response_converse", [self._execution_ctx, self._stream_chunks])
131+
core.dispatch("botocore.bedrock.process_response_converse", [self._execution_ctx, stream_processor])
131132

132133

133134
def safe_token_count(token_count) -> Optional[int]:

ddtrace/llmobs/_integrations/bedrock.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Any
22
from typing import Dict
3+
from typing import Generator
34
from typing import List
45
from typing import Optional
56
from typing import Tuple
@@ -94,11 +95,18 @@ def _llmobs_set_tags(
9495
if ctx["resource"] == "Converse":
9596
output_messages = self._extract_output_message_for_converse(response)
9697
elif ctx["resource"] == "ConverseStream":
97-
(
98-
output_messages,
99-
additional_metadata,
100-
streamed_usage_metrics,
101-
) = self._extract_output_message_for_converse_stream(response)
98+
"""
99+
At this point, we signal to `_converse_output_stream_processor` that we're done with the stream
100+
and ready to get the final results. This causes `_converse_output_stream_processor` to break out of the
101+
while loop, do some final processing, and return the final results.
102+
"""
103+
try:
104+
response.send(None)
105+
except StopIteration as e:
106+
output_messages, additional_metadata, streamed_usage_metrics = e.value
107+
finally:
108+
response.close()
109+
102110
metadata.update(additional_metadata)
103111
usage_metrics.update(streamed_usage_metrics)
104112
else:
@@ -205,11 +213,16 @@ def _extract_output_message_for_converse(response: Dict[str, Any]):
205213
return get_messages_from_converse_content(role, content)
206214

207215
@staticmethod
208-
def _extract_output_message_for_converse_stream(
209-
streamed_body: List[Dict[str, Any]]
210-
) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, int]]:
216+
def _converse_output_stream_processor() -> (
217+
Generator[
218+
None,
219+
Dict[str, Any],
220+
Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, int]],
221+
]
222+
):
211223
"""
212-
Extract output messages from streamed converse responses.
224+
Listens for output chunks from a converse streamed response and builds a
225+
list of output messages, usage metrics, and metadata.
213226
214227
Converse stream response comes in chunks. The chunks we care about are:
215228
- a message start/stop event, or
@@ -229,7 +242,9 @@ def _extract_output_message_for_converse_stream(
229242

230243
current_message: Optional[Dict[str, Any]] = None
231244

232-
for chunk in streamed_body:
245+
chunk = yield
246+
247+
while chunk is not None:
233248
if "metadata" in chunk and "usage" in chunk["metadata"]:
234249
usage = chunk["metadata"]["usage"]
235250
for token_type in ("input", "output", "total"):
@@ -276,6 +291,8 @@ def _extract_output_message_for_converse_stream(
276291
)
277292
current_message = None
278293

294+
chunk = yield
295+
279296
# Handle the case where we didn't receive an explicit message stop event
280297
if current_message is not None and current_message.get("content_block_indicies"):
281298
messages.append(
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
LLM Observability: This fix resolves an issue where modifying bedrock converse streamed chunks caused traced spans to show modified content.

tests/contrib/botocore/test_bedrock_llmobs.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,3 +349,55 @@ def test_llmobs_converse_stream(cls, bedrock_client, request_vcr, mock_tracer, l
349349
},
350350
tags={"service": "aws.bedrock-runtime", "ml_app": "<ml-app-name>"},
351351
)
352+
353+
@pytest.mark.skipif(BOTO_VERSION < (1, 34, 131), reason="Converse API not available until botocore 1.34.131")
354+
def test_llmobs_converse_modified_stream(cls, bedrock_client, request_vcr, mock_tracer, llmobs_events):
355+
"""
356+
Verify that LLM Obs tracing works even if stream chunks are modified mid-stream.
357+
"""
358+
output_msg = ""
359+
request_params = create_bedrock_converse_request(**bedrock_converse_args_with_system_and_tool)
360+
with request_vcr.use_cassette("bedrock_converse_stream.yaml"):
361+
response = bedrock_client.converse_stream(**request_params)
362+
for chunk in response["stream"]:
363+
if "contentBlockDelta" in chunk and "delta" in chunk["contentBlockDelta"]:
364+
if "text" in chunk["contentBlockDelta"]["delta"]:
365+
output_msg += chunk["contentBlockDelta"]["delta"]["text"]
366+
# delete keys from streamed chunk
367+
[chunk.pop(key) for key in list(chunk.keys())]
368+
369+
span = mock_tracer.pop_traces()[0][0]
370+
assert len(llmobs_events) == 1
371+
372+
assert llmobs_events[0] == _expected_llmobs_llm_span_event(
373+
span,
374+
model_name="claude-3-sonnet-20240229-v1:0",
375+
model_provider="anthropic",
376+
input_messages=[
377+
{"role": "system", "content": request_params.get("system")[0]["text"]},
378+
{"role": "user", "content": request_params.get("messages")[0].get("content")[0].get("text")},
379+
],
380+
output_messages=[
381+
{
382+
"role": "assistant",
383+
"content": output_msg,
384+
"tool_calls": [
385+
{
386+
"arguments": {"concept": "distributed tracing"},
387+
"name": "fetch_concept",
388+
"tool_id": mock.ANY,
389+
}
390+
],
391+
}
392+
],
393+
metadata={
394+
"temperature": request_params.get("inferenceConfig", {}).get("temperature"),
395+
"max_tokens": request_params.get("inferenceConfig", {}).get("maxTokens"),
396+
},
397+
token_metrics={
398+
"input_tokens": 259,
399+
"output_tokens": 64,
400+
"total_tokens": 323,
401+
},
402+
tags={"service": "aws.bedrock-runtime", "ml_app": "<ml-app-name>"},
403+
)

0 commit comments

Comments
 (0)