-
Notifications
You must be signed in to change notification settings - Fork 592
feat: add Bedrock InvokeModelWithResponseStream instrumentation #2845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add Bedrock InvokeModelWithResponseStream instrumentation #2845
Conversation
09f9777
to
eeb1e84
Compare
@@ -102,6 +102,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { | |||
return this.requestPreSpanHookConverse(request, config, diag, true); | |||
case 'InvokeModel': | |||
return this.requestPreSpanHookInvokeModel(request, config, diag); | |||
case 'InvokeModelWithResponseStream': | |||
return this.requestPreSpanHookInvokeModelWithResponseStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to not re-use requestPreSpanHookInvokeModel
: add a 4th isStream
argument and pass in false
for 'InvokeModel', true
for 'InvokeModelWithResponseStream', and then make the minor update to the implementation? This is how it was done for 'Converse' and 'ConverseStream'.
It looks to me like the requestPreSpanHookInvokeModel
and requestPreSpanHookInvokeModelWithResponseStream
functions are almost identical ... except that the latter doesn't have blocks for 'meta.llama', 'cohere.*', and 'mistral'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, @trentm ! You are absolutely right! I've updated the code to consolidate requestPreSpanHookInvokeModel
and requestPreSpanHookInvokeModelWithResponseStream
into a single method using isStream
parameter as you suggested.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2845 +/- ##
==========================================
- Coverage 89.83% 89.81% -0.02%
==========================================
Files 188 188
Lines 9294 9376 +82
Branches 1907 1938 +31
==========================================
+ Hits 8349 8421 +72
- Misses 945 955 +10
🚀 New features to boost your workflow:
|
eeb1e84
to
0d197f0
Compare
): Promise<any> { | ||
const stream = response.data?.body; | ||
const modelId = response.request.commandInput?.modelId; | ||
if (!stream || !span.isRecording()) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!stream || !span.isRecording()) return; | |
if (!stream) return; |
!span.isRecording()
is already checked before responseHookInvokeModelWithResponseStream()
is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, @jj22ee! I removed this unnecessary check
return response.data; | ||
|
||
// Tap into the stream at the chunk level without modifying the chunk itself. | ||
function instrumentAsyncIterable<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be more efficient to declare these following functions outside of this member function scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @jj22ee , for the review and good catches! Yes, these helper functions could be declared outside the method, at the class level, instead of being re-created every time the method is called.
Changed!
fe8416b
to
ff1cb2d
Compare
Update this to pull in the main branch and I can take another look over. Thanks! |
parsedChunk, | ||
span | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The non-stream responseHookInvokeModel()
is adding some attributes for a few more models: 'meta.llama'
, 'cohere.command-r'
, etc.
Should this function also add relevant attributes for those models? Or is it possible streaming is not supported for those models?
Also, perhaps the recordNovaAttributes()
, recordClaudeAttributes()
, etc. methods could be used by both responseHookInvokeModel
and responseHookInvokeModelWithResponseStream
. Or perhaps all the record*Attribute()
methods could be moved to one setInvokeModelResponseAttributes()
that is used by both responseHookInvokeModel*()
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// while OpenTelemetry can record span attributes from streamed data. | ||
response.data.body = (async function* () { | ||
try { | ||
for await (const item of wrappedStream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the result here is that two async iterators are created. Could the chunk handling above be moved into this for await ...
and then not bother having a wrappedStream
at all? I haven't tried this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
const str = Buffer.from(bytes).toString('utf-8'); | ||
return JSON.parse(str); | ||
} catch (err) { | ||
console.warn('Failed to parse streamed chunk', err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't use console
in instrumentation code. Instead use the DiagLogger that every intsrumentation instance has.
console.warn('Failed to parse streamed chunk', err); | |
this._diag.warn('Failed to parse streamed chunk', err); |
This will mean you'll need to not use static
for this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @trentm , for the review . I addressed all your comments and would appreciate next round of a review. Thanks
@yuliia-fryshko do you have bandwidth to work on this? Looks like there are some unaddressed comments that are blocking this from getting merged. |
0e2fef3
to
dcd6b28
Compare
Hi @pichlermarc ! Sorry for the long reply, I just back from the vacation today and I will try to address all comments today |
1941501
to
d86f003
Compare
71c7a6e
to
f7bbec1
Compare
Which problem is this PR solving?
Adds instrumentation of the InvokeModelWithResponseStreamCommand in the AWS Bedrock SDK.
Short description of the changes
instrumentAsyncIterable
is used to inspect streamed chunks in real time and extract relevant telemetry.