Skip to content

feat: add Bedrock InvokeModelWithResponseStream instrumentation #2845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@
case 'ConverseStream':
return this.requestPreSpanHookConverse(request, config, diag, true);
case 'InvokeModel':
return this.requestPreSpanHookInvokeModel(request, config, diag);
return this.requestPreSpanHookInvokeModel(request, config, diag, false);
case 'InvokeModelWithResponseStream':
return this.requestPreSpanHookInvokeModel(request, config, diag, true);
}

return {
Expand Down Expand Up @@ -157,7 +159,8 @@
private requestPreSpanHookInvokeModel(
request: NormalizedRequest,
config: AwsSdkInstrumentationConfig,
diag: DiagLogger
diag: DiagLogger,
isStream: boolean
): RequestMetadata {
let spanName: string | undefined;
const spanAttributes: Attributes = {
Expand Down Expand Up @@ -312,6 +315,7 @@
return {
spanName,
isIncoming: false,
isStream,
spanAttributes,
};
}
Expand Down Expand Up @@ -346,6 +350,13 @@
);
case 'InvokeModel':
return this.responseHookInvokeModel(response, span, tracer, config);
case 'InvokeModelWithResponseStream':
return this.responseHookInvokeModelWithResponseStream(
response,
span,
tracer,
config
);
}
}

Expand Down Expand Up @@ -579,4 +590,145 @@
}
}
}

private async responseHookInvokeModelWithResponseStream(
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig
): Promise<any> {
const stream = response.data?.body;
const modelId = response.request.commandInput?.modelId;
if (!stream) return;

const wrappedStream =
BedrockRuntimeServiceExtension.instrumentAsyncIterable(
stream,
async (chunk: { chunk?: { bytes?: Uint8Array } }) => {
const parsedChunk = BedrockRuntimeServiceExtension.parseChunk(
chunk?.chunk?.bytes
);

if (!parsedChunk) return;

if (modelId.includes('amazon.titan')) {
BedrockRuntimeServiceExtension.recordTitanAttributes(
parsedChunk,
span
);
} else if (modelId.includes('anthropic.claude')) {
BedrockRuntimeServiceExtension.recordClaudeAttributes(
parsedChunk,
span
);
} else if (modelId.includes('amazon.nova')) {
BedrockRuntimeServiceExtension.recordNovaAttributes(
parsedChunk,
span
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-stream responseHookInvokeModel() is adding some attributes for a few more models: 'meta.llama', 'cohere.command-r', etc.
Should this function also add relevant attributes for those models? Or is it possible streaming is not supported for those models?

Also, perhaps the recordNovaAttributes(), recordClaudeAttributes(), etc. methods could be used by both responseHookInvokeModel and responseHookInvokeModelWithResponseStream. Or perhaps all the record*Attribute() methods could be moved to one setInvokeModelResponseAttributes() that is used by both responseHookInvokeModel*() methods.

}
);
// Replace the original response body with our instrumented stream.
// - Defers span.end() until the entire stream is consumed
// This ensures downstream consumers still receive the full stream correctly,
// while OpenTelemetry can record span attributes from streamed data.
response.data.body = (async function* () {
try {
for await (const item of wrappedStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the result here is that two async iterators are created. Could the chunk handling above be moved into this for await ... and then not bother having a wrappedStream at all? I haven't tried this.

yield item;
}
} finally {
span.end();
}
})();
return response.data;
}
// Tap into the stream at the chunk level without modifying the chunk itself.
private static instrumentAsyncIterable<T>(
stream: AsyncIterable<T>,
onChunk: (chunk: T) => void
): AsyncIterable<T> {
return {
[Symbol.asyncIterator]: async function* () {
for await (const chunk of stream) {
onChunk(chunk);
yield chunk;
}
},
};
}

private static parseChunk(bytes?: Uint8Array): any {
if (!bytes || !(bytes instanceof Uint8Array)) return null;
try {
const str = Buffer.from(bytes).toString('utf-8');
return JSON.parse(str);
} catch (err) {
console.warn('Failed to parse streamed chunk', err);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't use console in instrumentation code. Instead use the DiagLogger that every intsrumentation instance has.

Suggested change
console.warn('Failed to parse streamed chunk', err);
this._diag.warn('Failed to parse streamed chunk', err);

This will mean you'll need to not use static for this method.

return null;

Check warning on line 669 in plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts

View check run for this annotation

Codecov / codecov/patch

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts#L668-L669

Added lines #L668 - L669 were not covered by tests
}
}

private static recordNovaAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.metadata?.usage !== undefined) {
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.metadata.usage.inputTokens
);
}
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.metadata.usage.outputTokens
);
}
}
if (parsedChunk.messageStop?.stopReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.messageStop.stopReason,
]);
}
}

private static recordClaudeAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.message.usage.input_tokens
);
}
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.message.usage.output_tokens
);
}
if (parsedChunk.delta?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.delta.stop_reason,
]);
}
}

private static recordTitanAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.inputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.inputTextTokenCount
);
}
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.totalOutputTextTokenCount
);
}
if (parsedChunk.completionReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.completionReason,
]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
ConverseStreamCommand,
ConversationRole,
InvokeModelCommand,
InvokeModelWithResponseStreamCommand,
} from '@aws-sdk/client-bedrock-runtime';
import { AwsCredentialIdentity } from '@aws-sdk/types';
import * as path from 'path';
Expand Down Expand Up @@ -79,6 +80,7 @@ const sanitizeRecordings = (scopes: Definition[]) => {
describe('Bedrock', () => {
nockBack.fixtures = path.join(__dirname, 'mock-responses');
let credentials: AwsCredentialIdentity | undefined;

if (nockBack.currentMode === 'dryrun') {
credentials = {
accessKeyId: 'testing',
Expand Down Expand Up @@ -642,4 +644,184 @@ describe('Bedrock', () => {
});
});
});

describe('InvokeModelWithStreams', () => {
it('adds amazon titan model attributes to span', async () => {
const modelId = 'amazon.titan-text-lite-v1';
const prompt = '\n\nHuman: Hello, How are you today? \n\nAssistant:';

const body = {
inputText: prompt,
textGenerationConfig: {
maxTokenCount: 10,
temperature: 0.8,
topP: 1,
stopSequences: ['|'],
},
};
const command = new InvokeModelWithResponseStreamCommand({
modelId,
body: JSON.stringify(body),
contentType: 'application/json',
accept: 'application/json',
});

const response = await client.send(command);
console.log('response', response);

let collectedText = '';
if (!response.body) return;
for await (const chunk of response.body) {
if (chunk?.chunk?.bytes instanceof Uint8Array) {
const parsed = JSON.parse(decodeChunk(chunk));
collectedText += parsed.outputText;
}
}
expect(collectedText).toBe(" Hello there! I'm doing well. Thank you");

const invokeModelSpans: ReadableSpan[] =
getInvokeModelWithResponseStreamSpans();

expect(invokeModelSpans.length).toBe(1);
expect(invokeModelSpans[0].attributes).toMatchObject({
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10,
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 13,
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10,
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['LENGTH'],
});
});
it('adds claude model attributes to span', async () => {
const modelId = 'anthropic.claude-3-5-sonnet-20240620-v1:0';
const prompt = '\n\nHuman: Hello, How are you today? \n\nAssistant:';

const body = {
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 12,
top_k: 250,
stop_sequences: ['|'],
temperature: 0.8,
top_p: 1,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: prompt,
},
],
},
],
};

const command = new InvokeModelWithResponseStreamCommand({
modelId,
body: JSON.stringify(body),
contentType: 'application/json',
accept: 'application/json',
});

const response = await client.send(command);

let collectedText = '';
if (!response.body) return;
for await (const chunk of response.body) {
if (chunk?.chunk?.bytes instanceof Uint8Array) {
const parsed = JSON.parse(decodeChunk(chunk));
if (
parsed.type === 'content_block_delta' &&
parsed.delta?.type === 'text_delta'
) {
collectedText += parsed.delta.text;
}
}
}

expect(collectedText).toBe(
"Hello! I'm doing well, thank you for asking."
);

const invokeModelSpans: ReadableSpan[] =
getInvokeModelWithResponseStreamSpans();

expect(invokeModelSpans.length).toBe(1);
expect(invokeModelSpans[0].attributes).toMatchObject({
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 12,
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 22,
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 1,
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
});
});

it('adds amazon nova model attributes to span', async () => {
const modelId = 'amazon.nova-pro-v1:0';
const prompt = 'Say this is a test';
const nativeRequest: any = {
messages: [{ role: 'user', content: [{ text: prompt }] }],
inferenceConfig: {
max_new_tokens: 10,
temperature: 0.8,
top_p: 1,
stopSequences: ['|'],
},
};
const command = new InvokeModelWithResponseStreamCommand({
modelId,
body: JSON.stringify(nativeRequest),
});

const response = await client.send(command);

let collectedText = '';
if (!response.body) return;
for await (const chunk of response.body) {
if (chunk?.chunk?.bytes instanceof Uint8Array) {
const parsed = JSON.parse(decodeChunk(chunk));
if (parsed.contentBlockDelta?.delta) {
collectedText += parsed.contentBlockDelta?.delta.text;
}
}
}

expect(collectedText).toBe(
"Certainly! If you're indicating that this interaction"
);

const invokeModelSpans: ReadableSpan[] =
getInvokeModelWithResponseStreamSpans();

expect(invokeModelSpans.length).toBe(1);
expect(invokeModelSpans[0].attributes).toMatchObject({
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10,
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 5,
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10,
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
});
});
});

function getInvokeModelWithResponseStreamSpans(): ReadableSpan[] {
return getTestSpans().filter((s: ReadableSpan) => {
return s.name === 'BedrockRuntime.InvokeModelWithResponseStream';
});
}

function decodeChunk(chunk: any) {
return Buffer.from(chunk.chunk.bytes).toString('utf-8');
}
});
Loading