Skip to content

Commit eeb1e84

Browse files
feat: add Bedrock InvokeModelWithResponseStream instrumentation
1 parent ab438a0 commit eeb1e84

5 files changed

+654
-0
lines changed

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
102102
return this.requestPreSpanHookConverse(request, config, diag, true);
103103
case 'InvokeModel':
104104
return this.requestPreSpanHookInvokeModel(request, config, diag);
105+
case 'InvokeModelWithResponseStream':
106+
return this.requestPreSpanHookInvokeModelWithResponseStream(
107+
request,
108+
config,
109+
diag,
110+
true
111+
);
105112
}
106113

107114
return {
@@ -316,6 +323,86 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
316323
};
317324
}
318325

326+
private requestPreSpanHookInvokeModelWithResponseStream(
327+
request: NormalizedRequest,
328+
config: AwsSdkInstrumentationConfig,
329+
diag: DiagLogger,
330+
isStream: boolean
331+
): RequestMetadata {
332+
let spanName: string | undefined;
333+
const spanAttributes: Attributes = {
334+
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
335+
// add operation name for InvokeModel API
336+
};
337+
338+
const modelId = request.commandInput?.modelId;
339+
if (modelId) {
340+
spanAttributes[ATTR_GEN_AI_REQUEST_MODEL] = modelId;
341+
}
342+
343+
if (request.commandInput?.body) {
344+
const requestBody = JSON.parse(request.commandInput.body);
345+
if (modelId.includes('amazon.titan')) {
346+
if (requestBody.textGenerationConfig?.temperature !== undefined) {
347+
spanAttributes[ATTR_GEN_AI_REQUEST_TEMPERATURE] =
348+
requestBody.textGenerationConfig.temperature;
349+
}
350+
if (requestBody.textGenerationConfig?.topP !== undefined) {
351+
spanAttributes[ATTR_GEN_AI_REQUEST_TOP_P] =
352+
requestBody.textGenerationConfig.topP;
353+
}
354+
if (requestBody.textGenerationConfig?.maxTokenCount !== undefined) {
355+
spanAttributes[ATTR_GEN_AI_REQUEST_MAX_TOKENS] =
356+
requestBody.textGenerationConfig.maxTokenCount;
357+
}
358+
if (requestBody.textGenerationConfig?.stopSequences !== undefined) {
359+
spanAttributes[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES] =
360+
requestBody.textGenerationConfig.stopSequences;
361+
}
362+
} else if (modelId.includes('anthropic.claude')) {
363+
if (requestBody.max_tokens !== undefined) {
364+
spanAttributes[ATTR_GEN_AI_REQUEST_MAX_TOKENS] =
365+
requestBody.max_tokens;
366+
}
367+
if (requestBody.temperature !== undefined) {
368+
spanAttributes[ATTR_GEN_AI_REQUEST_TEMPERATURE] =
369+
requestBody.temperature;
370+
}
371+
if (requestBody.top_p !== undefined) {
372+
spanAttributes[ATTR_GEN_AI_REQUEST_TOP_P] = requestBody.top_p;
373+
}
374+
if (requestBody.stop_sequences !== undefined) {
375+
spanAttributes[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES] =
376+
requestBody.stop_sequences;
377+
}
378+
} else if (modelId.includes('amazon.nova')) {
379+
if (requestBody.inferenceConfig?.temperature !== undefined) {
380+
spanAttributes[ATTR_GEN_AI_REQUEST_TEMPERATURE] =
381+
requestBody.inferenceConfig.temperature;
382+
}
383+
if (requestBody.inferenceConfig?.top_p !== undefined) {
384+
spanAttributes[ATTR_GEN_AI_REQUEST_TOP_P] =
385+
requestBody.inferenceConfig.top_p;
386+
}
387+
if (requestBody.inferenceConfig?.max_new_tokens !== undefined) {
388+
spanAttributes[ATTR_GEN_AI_REQUEST_MAX_TOKENS] =
389+
requestBody.inferenceConfig.max_new_tokens;
390+
}
391+
if (requestBody.inferenceConfig?.stopSequences !== undefined) {
392+
spanAttributes[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES] =
393+
requestBody.inferenceConfig.stopSequences;
394+
}
395+
}
396+
}
397+
398+
return {
399+
spanName,
400+
isIncoming: false,
401+
spanAttributes,
402+
isStream,
403+
};
404+
}
405+
319406
responseHook(
320407
response: NormalizedResponse,
321408
span: Span,
@@ -346,6 +433,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
346433
);
347434
case 'InvokeModel':
348435
return this.responseHookInvokeModel(response, span, tracer, config);
436+
case 'InvokeModelWithResponseStream':
437+
return this.responseHookInvokeModelWithResponseStream(
438+
response,
439+
span,
440+
tracer,
441+
config
442+
);
349443
}
350444
}
351445

@@ -579,4 +673,134 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
579673
}
580674
}
581675
}
676+
677+
private async responseHookInvokeModelWithResponseStream(
678+
response: NormalizedResponse,
679+
span: Span,
680+
tracer: Tracer,
681+
config: AwsSdkInstrumentationConfig
682+
): Promise<any> {
683+
const stream = response.data?.body;
684+
const modelId = response.request.commandInput?.modelId;
685+
if (!stream || !span.isRecording()) return;
686+
687+
const wrappedStream = instrumentAsyncIterable(
688+
stream,
689+
async (chunk: { chunk?: { bytes?: Uint8Array } }) => {
690+
const parsedChunk = parseChunk(chunk?.chunk?.bytes);
691+
692+
if (!parsedChunk) return;
693+
694+
if (modelId.includes('amazon.titan')) {
695+
recordTitanAttributes(parsedChunk);
696+
} else if (modelId.includes('anthropic.claude')) {
697+
recordClaudeAttributes(parsedChunk);
698+
} else if (modelId.includes('amazon.nova')) {
699+
recordNovaAttributes(parsedChunk);
700+
}
701+
}
702+
);
703+
// Replace the original response body with our instrumented stream.
704+
// - Defers span.end() until the entire stream is consumed
705+
// This ensures downstream consumers still receive the full stream correctly,
706+
// while OpenTelemetry can record span attributes from streamed data.
707+
response.data.body = (async function* () {
708+
try {
709+
for await (const item of wrappedStream) {
710+
yield item;
711+
}
712+
} finally {
713+
span.end();
714+
}
715+
})();
716+
return response.data;
717+
718+
// Tap into the stream at the chunk level without modifying the chunk itself.
719+
function instrumentAsyncIterable<T>(
720+
stream: AsyncIterable<T>,
721+
onChunk: (chunk: T) => void
722+
): AsyncIterable<T> {
723+
return {
724+
[Symbol.asyncIterator]: async function* () {
725+
for await (const chunk of stream) {
726+
onChunk(chunk);
727+
yield chunk;
728+
}
729+
},
730+
};
731+
}
732+
733+
function parseChunk(bytes?: Uint8Array): any {
734+
if (!bytes || !(bytes instanceof Uint8Array)) return null;
735+
try {
736+
const str = Buffer.from(bytes).toString('utf-8');
737+
return JSON.parse(str);
738+
} catch (err) {
739+
console.warn('Failed to parse streamed chunk', err);
740+
return null;
741+
}
742+
}
743+
744+
function recordNovaAttributes(parsedChunk: any) {
745+
if (parsedChunk.metadata?.usage !== undefined) {
746+
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
747+
span.setAttribute(
748+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
749+
parsedChunk.metadata.usage.inputTokens
750+
);
751+
}
752+
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
753+
span.setAttribute(
754+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
755+
parsedChunk.metadata.usage.outputTokens
756+
);
757+
}
758+
}
759+
if (parsedChunk.messageStop?.stopReason !== undefined) {
760+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
761+
parsedChunk.messageStop.stopReason,
762+
]);
763+
}
764+
}
765+
766+
function recordClaudeAttributes(parsedChunk: any) {
767+
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
768+
span.setAttribute(
769+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
770+
parsedChunk.message.usage.input_tokens
771+
);
772+
}
773+
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
774+
span.setAttribute(
775+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
776+
parsedChunk.message.usage.output_tokens
777+
);
778+
}
779+
if (parsedChunk.delta?.stop_reason !== undefined) {
780+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
781+
parsedChunk.delta.stop_reason,
782+
]);
783+
}
784+
}
785+
786+
function recordTitanAttributes(parsedChunk: any) {
787+
if (parsedChunk.inputTextTokenCount !== undefined) {
788+
span.setAttribute(
789+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
790+
parsedChunk.inputTextTokenCount
791+
);
792+
}
793+
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
794+
span.setAttribute(
795+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
796+
parsedChunk.totalOutputTextTokenCount
797+
);
798+
}
799+
if (parsedChunk.completionReason !== undefined) {
800+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
801+
parsedChunk.completionReason,
802+
]);
803+
}
804+
}
805+
}
582806
}

0 commit comments

Comments
 (0)