From 712953c12ad985505a9e745c08e7a4cff15ec178 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 7 Jul 2025 15:48:11 +0200 Subject: [PATCH 01/43] save intermediate state --- .../src/orchestration-stream-response.ts | 12 ++++++++ .../orchestration/src/orchestration-stream.ts | 29 ++++++++++++++++++- packages/orchestration/src/util/index.ts | 1 + .../orchestration/src/util/module-results.ts | 16 ++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 packages/orchestration/src/util/module-results.ts diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index 0dfba796b..c8d812867 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -2,6 +2,7 @@ import { isMessageToolCall } from './util/index.js'; import type { ToolCallAccumulator } from './util/index.js'; import type { MessageToolCalls, + ModuleResultsStreaming, TokenUsage } from './client/api/schema/index.js'; import type { OrchestrationStream } from './orchestration-stream.js'; @@ -20,6 +21,7 @@ export class OrchestrationStreamResponse { Map > = new Map(); private _stream: OrchestrationStream | undefined; + private _moduleResults: ModuleResultsStreaming = {}; /** * Gets the token usage for the response. @@ -94,6 +96,16 @@ export class OrchestrationStreamResponse { return this._toolCallsAccumulators; } + /** + * @internal + */ + _setModuleResult( + moduleName: K, + result: ModuleResultsStreaming[K] + ): void { + this. + } + get stream(): OrchestrationStream { if (!this._stream) { throw new Error('Response stream is undefined.'); diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index b6a7fce88..ee4aebc3e 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -1,7 +1,7 @@ import { createLogger } from '@sap-cloud-sdk/util'; import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; -import { mergeToolCallChunk, type ToolCallAccumulator } from './internal.js'; +import { mergeModuleResults, mergeToolCallChunk, type ToolCallAccumulator } from './internal.js'; import type { CompletionPostResponseStreaming } from './client/api/schema/index.js'; import type { HttpResponse } from '@sap-cloud-sdk/http-client'; import type { OrchestrationStreamResponse } from './orchestration-stream-response.js'; @@ -147,6 +147,33 @@ export class OrchestrationStream extends SseStream { } } + /** + * @internal + */ + static async *_processModuleResults( + stream: OrchestrationStream, + response?: OrchestrationStreamResponse + ): AsyncGenerator { + if (!response) { + throw new Error('Response is required to process module results.'); + } + for await (const chunk of stream) { + const moduleResults = chunk.data.module_results; + if (moduleResults) { + for(const [key, value] of Object.entries(moduleResults)) { + if (key in ['llm', 'output_unmasking']) { + const accumulator = response._getModuleResult(key); + const result = mergeModuleResults(value, accumulator.get(key)); + response._setModuleResult(key, result); + } else { + response._setModuleResult(key, value); + } + } + } + yield chunk; + } + + /** * Transform a stream of chunks into a stream of content strings. * @param stream - Orchestration stream. diff --git a/packages/orchestration/src/util/index.ts b/packages/orchestration/src/util/index.ts index e605e922f..2431a196b 100644 --- a/packages/orchestration/src/util/index.ts +++ b/packages/orchestration/src/util/index.ts @@ -4,3 +4,4 @@ export * from './module-config.js'; export * from './masking.js'; export * from './translation.js'; export * from './tool-calls.js'; +export * from './module-results.js'; diff --git a/packages/orchestration/src/util/module-results.ts b/packages/orchestration/src/util/module-results.ts new file mode 100644 index 000000000..030dbc5af --- /dev/null +++ b/packages/orchestration/src/util/module-results.ts @@ -0,0 +1,16 @@ +/** + * @internal + */ +export function mergeModuleResults( + ...results: (T | undefined)[] +): T | undefined { + return results.reduce((acc, result) => { + if (result === undefined) { + return acc; + } + if (acc === undefined) { + return result; + } + return { ...acc, ...result }; + }, undefined as T | undefined); +} From 784d2bddfc512cb956b57d83de3cb0fd62a63aad Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Tue, 8 Jul 2025 10:17:15 +0200 Subject: [PATCH 02/43] checkpoint --- .../orchestration/src/orchestration-stream-response.ts | 3 ++- packages/orchestration/src/orchestration-stream.ts | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index dfd41a9da..3d8b2f2f0 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -10,6 +10,7 @@ import type { OrchestrationStream } from './orchestration-stream.js'; * Orchestration stream response. */ export class OrchestrationStreamResponse { + public moduleResults: ModuleResultsStreaming = {}; private _usage: TokenUsage | undefined; /** * Finish reasons for all choices. @@ -21,7 +22,7 @@ export class OrchestrationStreamResponse { > = new Map(); private _stream: OrchestrationStream | undefined; private _toolCalls: Map = new Map(); - private _moduleResults: ModuleResultsStreaming = {}; + private _contentAccumulators: Map> = new Map(); /** * Gets the token usage for the response. diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index b23139fab..c9cc7f017 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -2,7 +2,6 @@ import { createLogger } from '@sap-cloud-sdk/util'; import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; import { - mergeModuleResults, isMessageToolCall, mergeToolCallChunk, type ToolCallAccumulator @@ -187,15 +186,17 @@ export class OrchestrationStream extends SseStream { if (moduleResults) { for(const [key, value] of Object.entries(moduleResults)) { if (key in ['llm', 'output_unmasking']) { - const accumulator = response.getModuleResult(key); - const result = mergeModuleResults(value, accumulator.get(key)); - response._setModuleResult(key, result); + const accumulator = response._getContentAccumulator(key); + const result = mergeContentAccumulator(value, accumulator); + response._setContentAccumulator(key, result); } else { response._setModuleResult(key, value); } } } yield chunk; + + mergeContentAccumulators(response); } } From b2da91bfb7764548901f4bca68a7956c2f6ca944 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Tue, 8 Jul 2025 13:36:58 +0200 Subject: [PATCH 03/43] create utility function --- .../src/orchestration-stream-response.ts | 39 ++++++++++++++----- .../orchestration/src/orchestration-stream.ts | 26 ++++++++----- .../orchestration/src/util/module-results.ts | 25 ++++++------ 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index 3d8b2f2f0..d9b956614 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -1,16 +1,23 @@ +import { createLogger } from '@sap-cloud-sdk/util'; import type { ToolCallAccumulator } from './util/index.js'; import type { MessageToolCalls, - ModuleResultsStreaming, + ModuleResults, TokenUsage } from './client/api/schema/index.js'; import type { OrchestrationStream } from './orchestration-stream.js'; +const logger = createLogger({ + package: 'orchestration', + messageContext: 'orchestration-stream-response' +}); + /** * Orchestration stream response. */ export class OrchestrationStreamResponse { - public moduleResults: ModuleResultsStreaming = {}; + private _moduleResults: ModuleResults | undefined; + private _moduleResultsAccumulator: ModuleResults = {}; private _usage: TokenUsage | undefined; /** * Finish reasons for all choices. @@ -22,7 +29,6 @@ export class OrchestrationStreamResponse { > = new Map(); private _stream: OrchestrationStream | undefined; private _toolCalls: Map = new Map(); - private _contentAccumulators: Map> = new Map(); /** * Gets the token usage for the response. @@ -62,6 +68,13 @@ export class OrchestrationStreamResponse { this._finishReasons = finishReasons; } + /** + * @internal + */ + _getToolCallsAccumulators(): Map> { + return this._toolCallsAccumulators; + } + /** * Gets the tool calls for a specific choice index. * @param choiceIndex - The index of the choice to get the tool calls for. @@ -81,18 +94,24 @@ export class OrchestrationStreamResponse { /** * @internal */ - _getToolCallsAccumulators(): Map> { - return this._toolCallsAccumulators; + _setModuleResults(moduleResults: ModuleResults): void { + this._moduleResults = moduleResults; + } + + public getModuleResults(): ModuleResults | undefined { + if (this._moduleResults) { + return this._moduleResults; + } + logger.warn( + 'Module results are not set, likely because the stream has not finished yet.' + ); } /** * @internal */ - _setModuleResult( - moduleName: K, - result: ModuleResultsStreaming[K] - ): void { - this._moduleResults[moduleName] = result; + _getModuleResultsAccumulator(): ModuleResults { + return this._moduleResultsAccumulator; } get stream(): OrchestrationStream { diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index c9cc7f017..316e809d2 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -3,6 +3,7 @@ import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; import { isMessageToolCall, + mergeChoices, mergeToolCallChunk, type ToolCallAccumulator } from './internal.js'; @@ -184,19 +185,26 @@ export class OrchestrationStream extends SseStream { for await (const chunk of stream) { const moduleResults = chunk.data.module_results; if (moduleResults) { - for(const [key, value] of Object.entries(moduleResults)) { - if (key in ['llm', 'output_unmasking']) { - const accumulator = response._getContentAccumulator(key); - const result = mergeContentAccumulator(value, accumulator); - response._setContentAccumulator(key, result); - } else { - response._setModuleResult(key, value); + const accumulator = response._getModuleResultsAccumulator(); + for (const [key, value] of Object.entries(moduleResults)) { + switch (key) { + case 'llm': { + const result = { + ...value, + choices: mergeChoices(accumulator[key]?.choices, value.choices) + }; + accumulator[key] = result; + break; + } + case 'output_unmasking': + accumulator[key] = mergeChoices(accumulator[key], value); + break; + default: + accumulator[key] = value; } } } yield chunk; - - mergeContentAccumulators(response); } } diff --git a/packages/orchestration/src/util/module-results.ts b/packages/orchestration/src/util/module-results.ts index 030dbc5af..56ed310b2 100644 --- a/packages/orchestration/src/util/module-results.ts +++ b/packages/orchestration/src/util/module-results.ts @@ -1,16 +1,17 @@ /** * @internal */ -export function mergeModuleResults( - ...results: (T | undefined)[] -): T | undefined { - return results.reduce((acc, result) => { - if (result === undefined) { - return acc; - } - if (acc === undefined) { - return result; - } - return { ...acc, ...result }; - }, undefined as T | undefined); +export function mergeChoices(...results: (T | undefined)[]): T | undefined { + return results.reduce( + (acc, result) => { + if (result === undefined) { + return acc; + } + if (acc === undefined) { + return result; + } + return { ...acc, ...result }; + }, + undefined as T | undefined + ); } From a942d5498c824739aaecc05c589b582adbda6468 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Tue, 8 Jul 2025 15:30:29 +0200 Subject: [PATCH 04/43] checkpoint --- .../orchestration/src/orchestration-client.ts | 5 +- .../src/orchestration-response.ts | 2 +- .../src/orchestration-stream-response.ts | 108 ++++---------- .../orchestration/src/orchestration-stream.ts | 134 ++---------------- .../orchestration/src/util/module-results.ts | 29 ++++ 5 files changed, 74 insertions(+), 204 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.ts b/packages/orchestration/src/orchestration-client.ts index 4bdfc3ca3..15da8038b 100644 --- a/packages/orchestration/src/orchestration-client.ts +++ b/packages/orchestration/src/orchestration-client.ts @@ -143,9 +143,8 @@ export class OrchestrationClient { const stream = OrchestrationStream._create(streamResponse, controller); response.stream = stream ._pipe(OrchestrationStream._processChunk) - ._pipe(OrchestrationStream._processToolCalls, response) - ._pipe(OrchestrationStream._processFinishReason, response) - ._pipe(OrchestrationStream._processTokenUsage, response); + ._pipe(OrchestrationStream._processModuleResults, response) + ._pipe(OrchestrationStream._processStreamEnd, response); return response; } diff --git a/packages/orchestration/src/orchestration-response.ts b/packages/orchestration/src/orchestration-response.ts index e69d206d1..868d0e79d 100644 --- a/packages/orchestration/src/orchestration-response.ts +++ b/packages/orchestration/src/orchestration-response.ts @@ -103,7 +103,7 @@ export class OrchestrationResponse { // TODO: replace cast with LLMChoice[] after the bug in orchestration, where // 'role' in ResponseChatMessage is optional when it should be mandatory, is fixed. // https://github.com/SAP/ai-sdk-js-backlog/issues/306 - return (this.getChoices() as any).find( + return this.getChoices().find( (c: { index: number }) => c.index === index ); } diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index d9b956614..45923e133 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -1,7 +1,6 @@ import { createLogger } from '@sap-cloud-sdk/util'; -import type { ToolCallAccumulator } from './util/index.js'; import type { - MessageToolCalls, + LlmModuleResult, ModuleResults, TokenUsage } from './client/api/schema/index.js'; @@ -16,33 +15,22 @@ const logger = createLogger({ * Orchestration stream response. */ export class OrchestrationStreamResponse { - private _moduleResults: ModuleResults | undefined; - private _moduleResultsAccumulator: ModuleResults = {}; - private _usage: TokenUsage | undefined; - /** - * Finish reasons for all choices. - */ - private _finishReasons: Map = new Map(); - private _toolCallsAccumulators: Map< - number, - Map - > = new Map(); + public openStream = true; + private moduleResults: ModuleResults | undefined; + private orchestrationResult: LlmModuleResult | undefined; private _stream: OrchestrationStream | undefined; - private _toolCalls: Map = new Map(); /** * Gets the token usage for the response. * @returns The token usage for the response. */ public getTokenUsage(): TokenUsage | undefined { - return this._usage; - } - - /** - * @internal - */ - _setTokenUsage(usage: TokenUsage): void { - this._usage = usage; + if(this.orchestrationResult) { + return this.orchestrationResult.usage; + } + logger.warn( + 'The stream is still open, the token usage is not available yet.' + ); } /** @@ -51,74 +39,38 @@ export class OrchestrationStreamResponse { * @returns The finish reason for the specified choice index. */ public getFinishReason(choiceIndex = 0): string | undefined { - return this._finishReasons.get(choiceIndex); - } - - /** - * @internal - */ - _getFinishReasons(): Map { - return this._finishReasons; - } - - /** - * @internal - */ - _setFinishReasons(finishReasons: Map): void { - this._finishReasons = finishReasons; - } - - /** - * @internal - */ - _getToolCallsAccumulators(): Map> { - return this._toolCallsAccumulators; - } - - /** - * Gets the tool calls for a specific choice index. - * @param choiceIndex - The index of the choice to get the tool calls for. - * @returns The tool calls for the specified choice index. - */ - public getToolCalls(choiceIndex = 0): MessageToolCalls | undefined { - return this._toolCalls.get(choiceIndex); - } - - /** - * @internal - */ - _setToolCalls(choiceIndex: number, toolCalls: MessageToolCalls): void { - this._toolCalls.set(choiceIndex, toolCalls); + if(!this.openStream) { + return this.findChoiceByIndex(choiceIndex)?.finish_reason; + } + logger.warn( + 'The stream is still open, the finish reason is not available yet.' + ); } - /** - * @internal - */ - _setModuleResults(moduleResults: ModuleResults): void { - this._moduleResults = moduleResults; + get stream(): OrchestrationStream { + if (!this._stream) { + throw new Error('Response stream is undefined.'); + } + return this._stream; } public getModuleResults(): ModuleResults | undefined { - if (this._moduleResults) { - return this._moduleResults; + if (!this.openStream) { + return this.moduleResults; } logger.warn( - 'Module results are not set, likely because the stream has not finished yet.' + 'The stream is still open, module results are not available yet.' ); } - /** - * @internal - */ - _getModuleResultsAccumulator(): ModuleResults { - return this._moduleResultsAccumulator; + private getChoices() { + return this.orchestrationResult?.choices ?? []; } - get stream(): OrchestrationStream { - if (!this._stream) { - throw new Error('Response stream is undefined.'); - } - return this._stream; + private findChoiceByIndex(index: number) { + return this.getChoices().find( + (c: { index: number }) => c.index === index + ); } /** diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index 316e809d2..37521f0d6 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -2,14 +2,10 @@ import { createLogger } from '@sap-cloud-sdk/util'; import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; import { - isMessageToolCall, - mergeChoices, - mergeToolCallChunk, - type ToolCallAccumulator + mergeChoices } from './internal.js'; import type { - CompletionPostResponseStreaming, - MessageToolCalls + CompletionPostResponseStreaming } from './client/api/schema/index.js'; import type { HttpResponse } from '@sap-cloud-sdk/http-client'; import type { OrchestrationStreamResponse } from './orchestration-stream-response.js'; @@ -54,122 +50,18 @@ export class OrchestrationStream extends SseStream { } } - /** - * @internal - */ - static async *_processToolCalls( + static async *_processStreamEnd( stream: OrchestrationStream, response?: OrchestrationStreamResponse ): AsyncGenerator { if (!response) { - throw new Error('Response is required to process tool calls.'); + throw new Error('Response is required to process stream end.'); } for await (const chunk of stream) { - chunk.data.orchestration_result?.choices.forEach(choice => { - const choiceIndex = choice.index; - const toolCallsChunks = chunk.getDeltaToolCalls(choiceIndex); - if (toolCallsChunks) { - let toolCallAccumulators = response - ._getToolCallsAccumulators() - .get(choiceIndex); - if (!toolCallAccumulators) { - toolCallAccumulators = new Map(); - response - ._getToolCallsAccumulators() - .set(choiceIndex, toolCallAccumulators); - } - toolCallsChunks.map(toolCallChunk => { - const toolCallId = toolCallChunk.index; - const toolCallAccumulator = mergeToolCallChunk( - toolCallChunk, - toolCallAccumulators.get(toolCallId) - ); - toolCallAccumulators.set(toolCallId, toolCallAccumulator); - }); - } - }); yield chunk; } - for (const [ - choiceIndex, - toolCallsAccumulators - ] of response._getToolCallsAccumulators()) { - const toolCalls: MessageToolCalls = []; - for (const [id, acc] of toolCallsAccumulators.entries()) { - if (isMessageToolCall(acc)) { - toolCalls.push(acc); - } else { - logger.error( - `Error while parsing tool calls for choice index ${choiceIndex}: Tool call with id ${id} was incomplete.` - ); - } - } - response._setToolCalls(choiceIndex, toolCalls); - } - } - - /** - * @internal - */ - static async *_processFinishReason( - stream: OrchestrationStream, - response?: OrchestrationStreamResponse - ): AsyncGenerator { - if (!response) { - throw new Error('Response is required to process finish reasons.'); - } - for await (const chunk of stream) { - chunk.data.orchestration_result?.choices.forEach(choice => { - const choiceIndex = choice.index; - const finishReason = chunk.getFinishReason(choiceIndex); - if (finishReason) { - response._getFinishReasons().set(choiceIndex, finishReason); - switch (finishReason) { - case 'content_filter': - logger.error( - `Choice ${choiceIndex}: Stream finished with content filter hit.` - ); - break; - case 'length': - logger.error( - `Choice ${choiceIndex}: Stream finished with token length exceeded.` - ); - break; - case 'stop': - case 'tool_calls': - case 'function_call': - logger.debug(`Choice ${choiceIndex}: Stream finished.`); - break; - default: - logger.error( - `Choice ${choiceIndex}: Stream finished with unknown reason '${finishReason}'.` - ); - } - } - }); - yield chunk; - } - } - - /** - * @internal - */ - static async *_processTokenUsage( - stream: OrchestrationStream, - response?: OrchestrationStreamResponse - ): AsyncGenerator { - if (!response) { - throw new Error('Response is required to process token usage.'); - } - for await (const chunk of stream) { - const usage = chunk.getTokenUsage(); - if (usage) { - response._setTokenUsage(usage); - logger.debug(`Token usage: ${JSON.stringify(usage)}`); - } - yield chunk; - } + response.openStream = false; } /** @@ -186,26 +78,24 @@ export class OrchestrationStream extends SseStream { const moduleResults = chunk.data.module_results; if (moduleResults) { const accumulator = response._getModuleResultsAccumulator(); - for (const [key, value] of Object.entries(moduleResults)) { - switch (key) { + for (const [moduleName, moduleResult] of Object.entries(moduleResults)) { + switch (moduleName) { case 'llm': { - const result = { - ...value, - choices: mergeChoices(accumulator[key]?.choices, value.choices) - }; - accumulator[key] = result; + const mergedLlmResult = mergeLlmModuleResult(accumulator[moduleName], moduleResult); + accumulator[moduleName] = mergedLlmResult; break; } case 'output_unmasking': - accumulator[key] = mergeChoices(accumulator[key], value); + accumulator[moduleName] = mergeChoices(accumulator[moduleName], moduleResult); break; default: - accumulator[key] = value; + accumulator[moduleName] = moduleResult; } } } yield chunk; } + response._setModuleResults(response._getModuleResultsAccumulator()); } /** diff --git a/packages/orchestration/src/util/module-results.ts b/packages/orchestration/src/util/module-results.ts index 56ed310b2..24a4e1d92 100644 --- a/packages/orchestration/src/util/module-results.ts +++ b/packages/orchestration/src/util/module-results.ts @@ -1,3 +1,5 @@ +import type { LLMModuleResultStreaming, OrchestrationResponse } from '../index.js'; + /** * @internal */ @@ -15,3 +17,30 @@ export function mergeChoices(...results: (T | undefined)[]): T | undefined { undefined as T | undefined ); } + +/** + * @internal + */ +export function mergeLlmModuleResult( + ...results: (LLMModuleResultStreaming | undefined)[] +): LLMModuleResult | undefined { + return results.reduce((acc, result) => { + if (result === undefined) { + return acc; + } + if (acc === undefined) { + return result; + } + return { + ...acc, + choices: mergeChoices(acc.choices, result.choices), + finish_reason: mergeChoices(acc.finish_reason, result.finish_reason), + usage: mergeChoices(acc.usage, result.usage), + }; + }, undefined as LLMModuleResult | undefined); +} + +export function mergeOrchestrationResult( + ...results: any[] // Replace 'any' with the actual type if known +): OrchestrationResponse | undefined { +} From 66a1c30c33a497db72780b88c5bdb6e1fda10a1e Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Wed, 9 Jul 2025 15:39:15 +0200 Subject: [PATCH 05/43] replicate utility --- .../orchestration/src/orchestration-client.ts | 2 +- .../src/orchestration-stream-response.ts | 110 +++++++++++++++--- .../orchestration/src/orchestration-stream.ts | 40 ++----- 3 files changed, 106 insertions(+), 46 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.ts b/packages/orchestration/src/orchestration-client.ts index 15da8038b..31a2d27f1 100644 --- a/packages/orchestration/src/orchestration-client.ts +++ b/packages/orchestration/src/orchestration-client.ts @@ -143,7 +143,7 @@ export class OrchestrationClient { const stream = OrchestrationStream._create(streamResponse, controller); response.stream = stream ._pipe(OrchestrationStream._processChunk) - ._pipe(OrchestrationStream._processModuleResults, response) + ._pipe(OrchestrationStream._processOrchestrationStreamChunkResponse, response) ._pipe(OrchestrationStream._processStreamEnd, response); return response; diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index 45923e133..95dd164d8 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -1,7 +1,10 @@ import { createLogger } from '@sap-cloud-sdk/util'; import type { - LlmModuleResult, - ModuleResults, + AssistantChatMessage, + ChatMessage, + ChatMessages, + CompletionPostResponse, + MessageToolCalls, TokenUsage } from './client/api/schema/index.js'; import type { OrchestrationStream } from './orchestration-stream.js'; @@ -15,9 +18,8 @@ const logger = createLogger({ * Orchestration stream response. */ export class OrchestrationStreamResponse { - public openStream = true; - private moduleResults: ModuleResults | undefined; - private orchestrationResult: LlmModuleResult | undefined; + public _openStream = true; + public _data: Partial = {}; private _stream: OrchestrationStream | undefined; /** @@ -25,11 +27,11 @@ export class OrchestrationStreamResponse { * @returns The token usage for the response. */ public getTokenUsage(): TokenUsage | undefined { - if(this.orchestrationResult) { - return this.orchestrationResult.usage; + if(!this._openStream) { + return this._data.orchestration_result?.usage; } logger.warn( - 'The stream is still open, the token usage is not available yet.' + 'The stream is still open, the token usage is not available yet. Please wait until the stream is closed.' ); } @@ -39,14 +41,92 @@ export class OrchestrationStreamResponse { * @returns The finish reason for the specified choice index. */ public getFinishReason(choiceIndex = 0): string | undefined { - if(!this.openStream) { + if(!this._openStream) { return this.findChoiceByIndex(choiceIndex)?.finish_reason; } logger.warn( - 'The stream is still open, the finish reason is not available yet.' + 'The stream is still open, the finish reason is not available yet. Please wait until the stream is closed.' ); } + /** + * Parses the orchestration response and returns the content. + * If the response was filtered, an error is thrown. + * @param choiceIndex - The index of the choice to parse. + * @returns The message content. + */ + // need to check for content filter hits in the post processing + public getContent(choiceIndex = 0): string | undefined { + if(!this._openStream) { + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.content; + } + logger.warn( + 'The stream is still open, the content is not available yet. Please wait until the stream is closed.' + ); + } + + /** + * Parses the orchestration response and returns the tool calls generated by the model. + * @param choiceIndex - The index of the choice to parse. + * @returns The message tool calls. + */ + public getToolCalls(choiceIndex = 0): MessageToolCalls | undefined { + if(!this._openStream) { + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.tool_calls; + } + logger.warn( + 'The stream is still open, the tool calls are not available yet. Please wait until the stream is closed.' + ); + } + + /** + * Parses the orchestration response and returns the refusal message generated by the model. + * @param choiceIndex - The index of the choice to parse. + * @returns The refusal string. + */ + public getRefusal(choiceIndex = 0): string | undefined { + if(!this._openStream) { + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.refusal; + } + logger.warn( + 'The stream is still open, the refusal message is not available yet. Please wait until the stream is closed.' + ); + } + + /** + * Messages that can be used for subsequent prompts as message history. + * @param choiceIndex - The index of the choice to parse. + * @returns A list of all messages. + */ + public getAllMessages(choiceIndex = 0): ChatMessages | undefined { + if(!this._openStream) { + const messages: ChatMessage[] = this._data.module_results?.templating ?? []; + const content = this.findChoiceByIndex(choiceIndex)?.message; + return content ? [...messages, content] : messages; + } + logger.warn( + 'The stream is still open, the messages are not available yet. Please wait until the stream is closed.' + ); + } + + /** + * Gets the assistant message from the response. + * @param choiceIndex - The index of the choice to use (default is 0). + * @returns The assistant message. + */ + + public getAssistantMessage(choiceIndex = 0): AssistantChatMessage | undefined { + if(!this._openStream) { + return this.findChoiceByIndex(choiceIndex)?.message; + } + logger.warn( + 'The stream is still open, the assistant message is not available yet. Please wait until the stream is closed.' + ); + } + get stream(): OrchestrationStream { if (!this._stream) { throw new Error('Response stream is undefined.'); @@ -54,17 +134,17 @@ export class OrchestrationStreamResponse { return this._stream; } - public getModuleResults(): ModuleResults | undefined { - if (!this.openStream) { - return this.moduleResults; + public getResponse(): CompletionPostResponse | undefined { + if(!this._openStream) { + return this._data as CompletionPostResponse; } logger.warn( - 'The stream is still open, module results are not available yet.' + 'The stream is still open, the response is not available yet. Please wait until the stream is closed.' ); } private getChoices() { - return this.orchestrationResult?.choices ?? []; + return this._data.orchestration_result?.choices ?? []; } private findChoiceByIndex(index: number) { diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index 37521f0d6..f96e710f8 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -1,9 +1,6 @@ import { createLogger } from '@sap-cloud-sdk/util'; import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; -import { - mergeChoices -} from './internal.js'; import type { CompletionPostResponseStreaming } from './client/api/schema/index.js'; @@ -50,52 +47,35 @@ export class OrchestrationStream extends SseStream { } } - static async *_processStreamEnd( + static async *_processOrchestrationStreamChunkResponse( stream: OrchestrationStream, response?: OrchestrationStreamResponse ): AsyncGenerator { if (!response) { - throw new Error('Response is required to process stream end.'); + throw new Error('Response is required to process completion post response streaming.'); } for await (const chunk of stream) { + // process request id + // process orchestration result + // process module results yield chunk; } - response.openStream = false; + // post processing for aggregation } - /** - * @internal - */ - static async *_processModuleResults( + static async *_processStreamEnd( stream: OrchestrationStream, response?: OrchestrationStreamResponse ): AsyncGenerator { if (!response) { - throw new Error('Response is required to process module results.'); + throw new Error('Response is required to process stream end.'); } for await (const chunk of stream) { - const moduleResults = chunk.data.module_results; - if (moduleResults) { - const accumulator = response._getModuleResultsAccumulator(); - for (const [moduleName, moduleResult] of Object.entries(moduleResults)) { - switch (moduleName) { - case 'llm': { - const mergedLlmResult = mergeLlmModuleResult(accumulator[moduleName], moduleResult); - accumulator[moduleName] = mergedLlmResult; - break; - } - case 'output_unmasking': - accumulator[moduleName] = mergeChoices(accumulator[moduleName], moduleResult); - break; - default: - accumulator[moduleName] = moduleResult; - } - } - } yield chunk; } - response._setModuleResults(response._getModuleResultsAccumulator()); + + response._openStream = false; } /** From 11da539883c9688ac2c0b73cf8852c21e53d1810 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Wed, 9 Jul 2025 16:05:52 +0200 Subject: [PATCH 06/43] start utility functions --- .../src/orchestration-stream-response.ts | 14 +++--- .../orchestration/src/orchestration-stream.ts | 12 +---- .../orchestration/src/util/module-results.ts | 46 ------------------- packages/orchestration/src/util/stream.ts | 40 ++++++++++++++++ 4 files changed, 49 insertions(+), 63 deletions(-) delete mode 100644 packages/orchestration/src/util/module-results.ts create mode 100644 packages/orchestration/src/util/stream.ts diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index 95dd164d8..a48f094c6 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -127,13 +127,6 @@ export class OrchestrationStreamResponse { ); } - get stream(): OrchestrationStream { - if (!this._stream) { - throw new Error('Response stream is undefined.'); - } - return this._stream; - } - public getResponse(): CompletionPostResponse | undefined { if(!this._openStream) { return this._data as CompletionPostResponse; @@ -143,6 +136,13 @@ export class OrchestrationStreamResponse { ); } + get stream(): OrchestrationStream { + if (!this._stream) { + throw new Error('Response stream is undefined.'); + } + return this._stream; + } + private getChoices() { return this._data.orchestration_result?.choices ?? []; } diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index f96e710f8..b51fc05d4 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -1,17 +1,13 @@ import { createLogger } from '@sap-cloud-sdk/util'; import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; +import { mergeStreamResponse } from './util/stream.js'; import type { CompletionPostResponseStreaming } from './client/api/schema/index.js'; import type { HttpResponse } from '@sap-cloud-sdk/http-client'; import type { OrchestrationStreamResponse } from './orchestration-stream-response.js'; -const logger = createLogger({ - package: 'orchestration', - messageContext: 'orchestration-chat-completion-stream' -}); - /** * Orchestration stream containing post-processing functions. */ @@ -55,13 +51,9 @@ export class OrchestrationStream extends SseStream { throw new Error('Response is required to process completion post response streaming.'); } for await (const chunk of stream) { - // process request id - // process orchestration result - // process module results + mergeStreamResponse(chunk.data, response); yield chunk; } - - // post processing for aggregation } static async *_processStreamEnd( diff --git a/packages/orchestration/src/util/module-results.ts b/packages/orchestration/src/util/module-results.ts deleted file mode 100644 index 24a4e1d92..000000000 --- a/packages/orchestration/src/util/module-results.ts +++ /dev/null @@ -1,46 +0,0 @@ -import type { LLMModuleResultStreaming, OrchestrationResponse } from '../index.js'; - -/** - * @internal - */ -export function mergeChoices(...results: (T | undefined)[]): T | undefined { - return results.reduce( - (acc, result) => { - if (result === undefined) { - return acc; - } - if (acc === undefined) { - return result; - } - return { ...acc, ...result }; - }, - undefined as T | undefined - ); -} - -/** - * @internal - */ -export function mergeLlmModuleResult( - ...results: (LLMModuleResultStreaming | undefined)[] -): LLMModuleResult | undefined { - return results.reduce((acc, result) => { - if (result === undefined) { - return acc; - } - if (acc === undefined) { - return result; - } - return { - ...acc, - choices: mergeChoices(acc.choices, result.choices), - finish_reason: mergeChoices(acc.finish_reason, result.finish_reason), - usage: mergeChoices(acc.usage, result.usage), - }; - }, undefined as LLMModuleResult | undefined); -} - -export function mergeOrchestrationResult( - ...results: any[] // Replace 'any' with the actual type if known -): OrchestrationResponse | undefined { -} diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts new file mode 100644 index 000000000..e5bc3598e --- /dev/null +++ b/packages/orchestration/src/util/stream.ts @@ -0,0 +1,40 @@ +import type { CompletionPostResponseStreaming, OrchestrationStreamChunkResponse, OrchestrationStreamResponse } from '../index.js'; + +/** + * @internal + */ +export function mergeStreamResponse( + chunk: CompletionPostResponseStreaming, + response: OrchestrationStreamResponse +): void { + const data = response._data; + data.request_id = chunk.request_id; + data.module_results = mergeModuleResults(data.module_results, chunk.module_results); + data.orchestration_result = mergeOrchestrationResult(data.orchestration_result, chunk.orchestration_result); +} + +function mergeModuleResults( + existing: Record | undefined, + incoming: Record | undefined +): Record | undefined { + if (!existing) { + return incoming; + } + if (!incoming) { + return existing; + } + return { ...existing, ...incoming }; +} + +function mergeOrchestrationResult( + existing: Record | undefined, + incoming: Record | undefined +): Record | undefined { + if (!existing) { + return incoming; + } + if (!incoming) { + return existing; + } + return { ...existing, ...incoming }; +} From 831e861b154f4060196291b244a811c8dd54d6a4 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Wed, 9 Jul 2025 20:50:23 +0200 Subject: [PATCH 07/43] refactor --- .../src/orchestration-stream-response.ts | 38 +++++++------------ .../orchestration/src/orchestration-stream.ts | 3 +- packages/orchestration/src/util/index.ts | 2 +- packages/orchestration/src/util/stream.ts | 22 +++-------- 4 files changed, 21 insertions(+), 44 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index a48f094c6..bf12889e9 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -14,6 +14,12 @@ const logger = createLogger({ messageContext: 'orchestration-stream-response' }); +function openStreamWarning(missingData: string): void { + logger.warn( + `The stream is still open, the ${missingData} data is not available yet. Please wait until the stream is closed.` + ); +} + /** * Orchestration stream response. */ @@ -30,9 +36,7 @@ export class OrchestrationStreamResponse { if(!this._openStream) { return this._data.orchestration_result?.usage; } - logger.warn( - 'The stream is still open, the token usage is not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('token usage'); } /** @@ -44,9 +48,7 @@ export class OrchestrationStreamResponse { if(!this._openStream) { return this.findChoiceByIndex(choiceIndex)?.finish_reason; } - logger.warn( - 'The stream is still open, the finish reason is not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('finish reason'); } /** @@ -61,9 +63,7 @@ export class OrchestrationStreamResponse { const choice = this.findChoiceByIndex(choiceIndex); return choice?.message?.content; } - logger.warn( - 'The stream is still open, the content is not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('content'); } /** @@ -76,9 +76,7 @@ export class OrchestrationStreamResponse { const choice = this.findChoiceByIndex(choiceIndex); return choice?.message?.tool_calls; } - logger.warn( - 'The stream is still open, the tool calls are not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('tool calls'); } /** @@ -91,9 +89,7 @@ export class OrchestrationStreamResponse { const choice = this.findChoiceByIndex(choiceIndex); return choice?.message?.refusal; } - logger.warn( - 'The stream is still open, the refusal message is not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('refusal message'); } /** @@ -107,9 +103,7 @@ export class OrchestrationStreamResponse { const content = this.findChoiceByIndex(choiceIndex)?.message; return content ? [...messages, content] : messages; } - logger.warn( - 'The stream is still open, the messages are not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('messages'); } /** @@ -122,18 +116,14 @@ export class OrchestrationStreamResponse { if(!this._openStream) { return this.findChoiceByIndex(choiceIndex)?.message; } - logger.warn( - 'The stream is still open, the assistant message is not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('assistant message'); } public getResponse(): CompletionPostResponse | undefined { if(!this._openStream) { return this._data as CompletionPostResponse; } - logger.warn( - 'The stream is still open, the response is not available yet. Please wait until the stream is closed.' - ); + openStreamWarning('response'); } get stream(): OrchestrationStream { diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index b51fc05d4..a84a31d41 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -1,7 +1,6 @@ -import { createLogger } from '@sap-cloud-sdk/util'; import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; -import { mergeStreamResponse } from './util/stream.js'; +import { mergeStreamResponse } from './util/index.js'; import type { CompletionPostResponseStreaming } from './client/api/schema/index.js'; diff --git a/packages/orchestration/src/util/index.ts b/packages/orchestration/src/util/index.ts index 2431a196b..940d00946 100644 --- a/packages/orchestration/src/util/index.ts +++ b/packages/orchestration/src/util/index.ts @@ -4,4 +4,4 @@ export * from './module-config.js'; export * from './masking.js'; export * from './translation.js'; export * from './tool-calls.js'; -export * from './module-results.js'; +export * from './stream.js'; diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index e5bc3598e..f3e1d00f7 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -1,4 +1,4 @@ -import type { CompletionPostResponseStreaming, OrchestrationStreamChunkResponse, OrchestrationStreamResponse } from '../index.js'; +import type { CompletionPostResponseStreaming, LlmModuleResult, ModuleResults, OrchestrationStreamChunkResponse, OrchestrationStreamResponse } from '../index.js'; /** * @internal @@ -16,25 +16,13 @@ export function mergeStreamResponse( function mergeModuleResults( existing: Record | undefined, incoming: Record | undefined -): Record | undefined { - if (!existing) { - return incoming; - } - if (!incoming) { - return existing; - } - return { ...existing, ...incoming }; +): ModuleResults { + return { ...existing, ...incoming } as ModuleResults; } function mergeOrchestrationResult( existing: Record | undefined, incoming: Record | undefined -): Record | undefined { - if (!existing) { - return incoming; - } - if (!incoming) { - return existing; - } - return { ...existing, ...incoming }; +): LlmModuleResult { + return { ...existing, ...incoming } as LlmModuleResult; } From 8092d5f2df12e4e2df39550d92ca85f8d78cd626 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 10:45:59 +0200 Subject: [PATCH 08/43] save --- packages/orchestration/src/util/stream.ts | 36 ++++++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index f3e1d00f7..1dcb9d75a 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -1,4 +1,4 @@ -import type { CompletionPostResponseStreaming, LlmModuleResult, ModuleResults, OrchestrationStreamChunkResponse, OrchestrationStreamResponse } from '../index.js'; +import type { CompletionPostResponseStreaming, LlmChoice, LlmChoiceStreaming, LlmModuleResult, LLMModuleResultStreaming, ModuleResults, ModuleResultsStreaming, OrchestrationStreamChunkResponse, OrchestrationStreamResponse } from '../index.js'; /** * @internal @@ -10,19 +10,39 @@ export function mergeStreamResponse( const data = response._data; data.request_id = chunk.request_id; data.module_results = mergeModuleResults(data.module_results, chunk.module_results); - data.orchestration_result = mergeOrchestrationResult(data.orchestration_result, chunk.orchestration_result); + data.orchestration_result = mergeLlmModule(data.orchestration_result, chunk.orchestration_result); } function mergeModuleResults( - existing: Record | undefined, - incoming: Record | undefined + existing: ModuleResults | undefined, + incoming: ModuleResultsStreaming | undefined ): ModuleResults { - return { ...existing, ...incoming } as ModuleResults; + const mergedModuleResults = { ...existing }; + for(const [moduleName, moduleResult] of Object.entries(incoming || {})) { + switch(moduleName) { + case 'llm': + mergedModuleResults[moduleName] = mergeLlmModule(mergedModuleResults[moduleName], moduleResult); + break; + case 'output_unmasking': + mergedModuleResults[moduleName] = mergeOutputUnmaskingModule(mergedModuleResults[moduleName], moduleResult); + break; + default: + mergedModuleResults[moduleName] = moduleResult; + } + } + return mergedModuleResults; } -function mergeOrchestrationResult( - existing: Record | undefined, - incoming: Record | undefined +function mergeLlmModule( + existing: LlmModuleResult | undefined, + incoming: LLMModuleResultStreaming | undefined ): LlmModuleResult { return { ...existing, ...incoming } as LlmModuleResult; } + +function mergeOutputUnmaskingModule( + existing: LlmChoice[] | undefined, + incoming: LlmChoiceStreaming[] | undefined +): LlmChoice[] { + return [...(existing || []), ...(incoming || [])] as LlmChoice[]; +} From 1c276fef149f58f971c031f1b8d1c4ebe59d773c Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 11:17:59 +0200 Subject: [PATCH 09/43] checkpoint --- packages/orchestration/src/util/stream.ts | 39 ++++++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 1dcb9d75a..5dbceeb5e 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -24,7 +24,7 @@ function mergeModuleResults( mergedModuleResults[moduleName] = mergeLlmModule(mergedModuleResults[moduleName], moduleResult); break; case 'output_unmasking': - mergedModuleResults[moduleName] = mergeOutputUnmaskingModule(mergedModuleResults[moduleName], moduleResult); + mergedModuleResults[moduleName] = mergeLlmChoices(mergedModuleResults[moduleName], moduleResult); break; default: mergedModuleResults[moduleName] = moduleResult; @@ -36,13 +36,42 @@ function mergeModuleResults( function mergeLlmModule( existing: LlmModuleResult | undefined, incoming: LLMModuleResultStreaming | undefined -): LlmModuleResult { - return { ...existing, ...incoming } as LlmModuleResult; +): LlmModuleResult | undefined { + if(!incoming) { + return existing + } + const mergedModuleResults = { + ...incoming, + usage: mergeTokenUsage(existing?.usage, incoming.usage), + choices: mergeLlmChoices(existing?.choices, incoming?.choices) + }; + return mergedModuleResults; +} + +function mergeTokenUsage( + existing: { prompt_tokens: number; completion_tokens: number; total_tokens: number } | undefined, + incoming: { prompt_tokens: number; completion_tokens: number; total_tokens: number } | undefined +): { prompt_tokens: number; completion_tokens: number; total_tokens: number } { + return { + prompt_tokens: incoming?.prompt_tokens ?? existing?.prompt_tokens ?? 0, + completion_tokens: incoming?.completion_tokens ?? existing?.completion_tokens ?? 0, + total_tokens: incoming?.total_tokens ?? existing?.total_tokens ?? 0 + }; } -function mergeOutputUnmaskingModule( +function mergeLlmChoices( existing: LlmChoice[] | undefined, incoming: LlmChoiceStreaming[] | undefined ): LlmChoice[] { - return [...(existing || []), ...(incoming || [])] as LlmChoice[]; + return [ + ...(existing ?? []), + ...(incoming?.map(choice => ({ + incoming: choice.delta. + ...choice, + message: choice.message ? { + ...choice.message, + content: choice.message.content + } : undefined + })) ?? []) + ] as LlmChoice[]; } From cf30502b62166071b48de869f32a644cc875245b Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 11:20:54 +0200 Subject: [PATCH 10/43] lint --- .../orchestration/src/orchestration-client.ts | 5 +- .../src/orchestration-response.ts | 4 +- .../src/orchestration-stream-response.ts | 129 +++++++++--------- .../orchestration/src/orchestration-stream.ts | 8 +- packages/orchestration/src/util/stream.ts | 66 ++++++--- 5 files changed, 121 insertions(+), 91 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.ts b/packages/orchestration/src/orchestration-client.ts index 31a2d27f1..ae47293de 100644 --- a/packages/orchestration/src/orchestration-client.ts +++ b/packages/orchestration/src/orchestration-client.ts @@ -143,7 +143,10 @@ export class OrchestrationClient { const stream = OrchestrationStream._create(streamResponse, controller); response.stream = stream ._pipe(OrchestrationStream._processChunk) - ._pipe(OrchestrationStream._processOrchestrationStreamChunkResponse, response) + ._pipe( + OrchestrationStream._processOrchestrationStreamChunkResponse, + response + ) ._pipe(OrchestrationStream._processStreamEnd, response); return response; diff --git a/packages/orchestration/src/orchestration-response.ts b/packages/orchestration/src/orchestration-response.ts index 868d0e79d..e828c4667 100644 --- a/packages/orchestration/src/orchestration-response.ts +++ b/packages/orchestration/src/orchestration-response.ts @@ -103,8 +103,6 @@ export class OrchestrationResponse { // TODO: replace cast with LLMChoice[] after the bug in orchestration, where // 'role' in ResponseChatMessage is optional when it should be mandatory, is fixed. // https://github.com/SAP/ai-sdk-js-backlog/issues/306 - return this.getChoices().find( - (c: { index: number }) => c.index === index - ); + return this.getChoices().find((c: { index: number }) => c.index === index); } } diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index bf12889e9..84405df3f 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -33,7 +33,7 @@ export class OrchestrationStreamResponse { * @returns The token usage for the response. */ public getTokenUsage(): TokenUsage | undefined { - if(!this._openStream) { + if (!this._openStream) { return this._data.orchestration_result?.usage; } openStreamWarning('token usage'); @@ -45,82 +45,85 @@ export class OrchestrationStreamResponse { * @returns The finish reason for the specified choice index. */ public getFinishReason(choiceIndex = 0): string | undefined { - if(!this._openStream) { + if (!this._openStream) { return this.findChoiceByIndex(choiceIndex)?.finish_reason; } openStreamWarning('finish reason'); } - /** - * Parses the orchestration response and returns the content. - * If the response was filtered, an error is thrown. - * @param choiceIndex - The index of the choice to parse. - * @returns The message content. - */ - // need to check for content filter hits in the post processing - public getContent(choiceIndex = 0): string | undefined { - if(!this._openStream) { - const choice = this.findChoiceByIndex(choiceIndex); - return choice?.message?.content; - } - openStreamWarning('content'); + /** + * Parses the orchestration response and returns the content. + * If the response was filtered, an error is thrown. + * @param choiceIndex - The index of the choice to parse. + * @returns The message content. + */ + // need to check for content filter hits in the post processing + public getContent(choiceIndex = 0): string | undefined { + if (!this._openStream) { + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.content; } + openStreamWarning('content'); + } - /** - * Parses the orchestration response and returns the tool calls generated by the model. - * @param choiceIndex - The index of the choice to parse. - * @returns The message tool calls. - */ - public getToolCalls(choiceIndex = 0): MessageToolCalls | undefined { - if(!this._openStream) { - const choice = this.findChoiceByIndex(choiceIndex); - return choice?.message?.tool_calls; - } - openStreamWarning('tool calls'); + /** + * Parses the orchestration response and returns the tool calls generated by the model. + * @param choiceIndex - The index of the choice to parse. + * @returns The message tool calls. + */ + public getToolCalls(choiceIndex = 0): MessageToolCalls | undefined { + if (!this._openStream) { + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.tool_calls; } + openStreamWarning('tool calls'); + } - /** - * Parses the orchestration response and returns the refusal message generated by the model. - * @param choiceIndex - The index of the choice to parse. - * @returns The refusal string. - */ - public getRefusal(choiceIndex = 0): string | undefined { - if(!this._openStream) { - const choice = this.findChoiceByIndex(choiceIndex); - return choice?.message?.refusal; - } - openStreamWarning('refusal message'); + /** + * Parses the orchestration response and returns the refusal message generated by the model. + * @param choiceIndex - The index of the choice to parse. + * @returns The refusal string. + */ + public getRefusal(choiceIndex = 0): string | undefined { + if (!this._openStream) { + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.refusal; } + openStreamWarning('refusal message'); + } - /** - * Messages that can be used for subsequent prompts as message history. - * @param choiceIndex - The index of the choice to parse. - * @returns A list of all messages. - */ - public getAllMessages(choiceIndex = 0): ChatMessages | undefined { - if(!this._openStream) { - const messages: ChatMessage[] = this._data.module_results?.templating ?? []; - const content = this.findChoiceByIndex(choiceIndex)?.message; - return content ? [...messages, content] : messages; - } - openStreamWarning('messages'); + /** + * Messages that can be used for subsequent prompts as message history. + * @param choiceIndex - The index of the choice to parse. + * @returns A list of all messages. + */ + public getAllMessages(choiceIndex = 0): ChatMessages | undefined { + if (!this._openStream) { + const messages: ChatMessage[] = + this._data.module_results?.templating ?? []; + const content = this.findChoiceByIndex(choiceIndex)?.message; + return content ? [...messages, content] : messages; } + openStreamWarning('messages'); + } - /** - * Gets the assistant message from the response. - * @param choiceIndex - The index of the choice to use (default is 0). - * @returns The assistant message. - */ - - public getAssistantMessage(choiceIndex = 0): AssistantChatMessage | undefined { - if(!this._openStream) { - return this.findChoiceByIndex(choiceIndex)?.message; - } - openStreamWarning('assistant message'); + /** + * Gets the assistant message from the response. + * @param choiceIndex - The index of the choice to use (default is 0). + * @returns The assistant message. + */ + + public getAssistantMessage( + choiceIndex = 0 + ): AssistantChatMessage | undefined { + if (!this._openStream) { + return this.findChoiceByIndex(choiceIndex)?.message; } + openStreamWarning('assistant message'); + } public getResponse(): CompletionPostResponse | undefined { - if(!this._openStream) { + if (!this._openStream) { return this._data as CompletionPostResponse; } openStreamWarning('response'); @@ -138,9 +141,7 @@ export class OrchestrationStreamResponse { } private findChoiceByIndex(index: number) { - return this.getChoices().find( - (c: { index: number }) => c.index === index - ); + return this.getChoices().find((c: { index: number }) => c.index === index); } /** diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index a84a31d41..0f9011b7b 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -1,9 +1,7 @@ import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; import { mergeStreamResponse } from './util/index.js'; -import type { - CompletionPostResponseStreaming -} from './client/api/schema/index.js'; +import type { CompletionPostResponseStreaming } from './client/api/schema/index.js'; import type { HttpResponse } from '@sap-cloud-sdk/http-client'; import type { OrchestrationStreamResponse } from './orchestration-stream-response.js'; @@ -47,7 +45,9 @@ export class OrchestrationStream extends SseStream { response?: OrchestrationStreamResponse ): AsyncGenerator { if (!response) { - throw new Error('Response is required to process completion post response streaming.'); + throw new Error( + 'Response is required to process completion post response streaming.' + ); } for await (const chunk of stream) { mergeStreamResponse(chunk.data, response); diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 5dbceeb5e..7bb8ebb01 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -1,4 +1,14 @@ -import type { CompletionPostResponseStreaming, LlmChoice, LlmChoiceStreaming, LlmModuleResult, LLMModuleResultStreaming, ModuleResults, ModuleResultsStreaming, OrchestrationStreamChunkResponse, OrchestrationStreamResponse } from '../index.js'; +import type { + CompletionPostResponseStreaming, + LlmChoice, + LlmChoiceStreaming, + LlmModuleResult, + LLMModuleResultStreaming, + ModuleResults, + ModuleResultsStreaming, + OrchestrationStreamChunkResponse, + OrchestrationStreamResponse +} from '../index.js'; /** * @internal @@ -7,10 +17,16 @@ export function mergeStreamResponse( chunk: CompletionPostResponseStreaming, response: OrchestrationStreamResponse ): void { - const data = response._data; - data.request_id = chunk.request_id; - data.module_results = mergeModuleResults(data.module_results, chunk.module_results); - data.orchestration_result = mergeLlmModule(data.orchestration_result, chunk.orchestration_result); + const data = response._data; + data.request_id = chunk.request_id; + data.module_results = mergeModuleResults( + data.module_results, + chunk.module_results + ); + data.orchestration_result = mergeLlmModule( + data.orchestration_result, + chunk.orchestration_result + ); } function mergeModuleResults( @@ -18,13 +34,19 @@ function mergeModuleResults( incoming: ModuleResultsStreaming | undefined ): ModuleResults { const mergedModuleResults = { ...existing }; - for(const [moduleName, moduleResult] of Object.entries(incoming || {})) { - switch(moduleName) { + for (const [moduleName, moduleResult] of Object.entries(incoming || {})) { + switch (moduleName) { case 'llm': - mergedModuleResults[moduleName] = mergeLlmModule(mergedModuleResults[moduleName], moduleResult); + mergedModuleResults[moduleName] = mergeLlmModule( + mergedModuleResults[moduleName], + moduleResult + ); break; case 'output_unmasking': - mergedModuleResults[moduleName] = mergeLlmChoices(mergedModuleResults[moduleName], moduleResult); + mergedModuleResults[moduleName] = mergeLlmChoices( + mergedModuleResults[moduleName], + moduleResult + ); break; default: mergedModuleResults[moduleName] = moduleResult; @@ -37,8 +59,8 @@ function mergeLlmModule( existing: LlmModuleResult | undefined, incoming: LLMModuleResultStreaming | undefined ): LlmModuleResult | undefined { - if(!incoming) { - return existing + if (!incoming) { + return existing; } const mergedModuleResults = { ...incoming, @@ -49,12 +71,17 @@ function mergeLlmModule( } function mergeTokenUsage( - existing: { prompt_tokens: number; completion_tokens: number; total_tokens: number } | undefined, - incoming: { prompt_tokens: number; completion_tokens: number; total_tokens: number } | undefined + existing: + | { prompt_tokens: number; completion_tokens: number; total_tokens: number } + | undefined, + incoming: + | { prompt_tokens: number; completion_tokens: number; total_tokens: number } + | undefined ): { prompt_tokens: number; completion_tokens: number; total_tokens: number } { return { prompt_tokens: incoming?.prompt_tokens ?? existing?.prompt_tokens ?? 0, - completion_tokens: incoming?.completion_tokens ?? existing?.completion_tokens ?? 0, + completion_tokens: + incoming?.completion_tokens ?? existing?.completion_tokens ?? 0, total_tokens: incoming?.total_tokens ?? existing?.total_tokens ?? 0 }; } @@ -66,12 +93,13 @@ function mergeLlmChoices( return [ ...(existing ?? []), ...(incoming?.map(choice => ({ - incoming: choice.delta. ...choice, - message: choice.message ? { - ...choice.message, - content: choice.message.content - } : undefined + message: choice.message + ? { + ...choice.message, + content: choice.message.content + } + : undefined })) ?? []) ] as LlmChoice[]; } From e88e267ba3ef640d8f05f34f9cdc5ecddc203a15 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 12:47:13 +0200 Subject: [PATCH 11/43] lint --- .../orchestration/src/orchestration-stream.ts | 3 +- packages/orchestration/src/util/index.ts | 1 - packages/orchestration/src/util/stream.ts | 97 ++++++++++++++++++- packages/orchestration/src/util/tool-calls.ts | 78 --------------- 4 files changed, 98 insertions(+), 81 deletions(-) delete mode 100644 packages/orchestration/src/util/tool-calls.ts diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index 0f9011b7b..26a23db50 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -1,6 +1,6 @@ import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; -import { mergeStreamResponse } from './util/index.js'; +import { mergeStreamResponse, validateResponse } from './util/index.js'; import type { CompletionPostResponseStreaming } from './client/api/schema/index.js'; import type { HttpResponse } from '@sap-cloud-sdk/http-client'; import type { OrchestrationStreamResponse } from './orchestration-stream-response.js'; @@ -67,6 +67,7 @@ export class OrchestrationStream extends SseStream { } response._openStream = false; + validateResponse(response); } /** diff --git a/packages/orchestration/src/util/index.ts b/packages/orchestration/src/util/index.ts index 940d00946..0e5865093 100644 --- a/packages/orchestration/src/util/index.ts +++ b/packages/orchestration/src/util/index.ts @@ -3,5 +3,4 @@ export * from './grounding.js'; export * from './module-config.js'; export * from './masking.js'; export * from './translation.js'; -export * from './tool-calls.js'; export * from './stream.js'; diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 7bb8ebb01..a09964e89 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -1,15 +1,23 @@ +import { createLogger } from '@sap-cloud-sdk/util'; import type { CompletionPostResponseStreaming, LlmChoice, LlmChoiceStreaming, LlmModuleResult, LLMModuleResultStreaming, + MessageToolCall, ModuleResults, ModuleResultsStreaming, OrchestrationStreamChunkResponse, - OrchestrationStreamResponse + OrchestrationStreamResponse, + ResponseChatMessage } from '../index.js'; +const logger = createLogger({ + package: 'orchestration', + messageContext: 'stream-util' +}); + /** * @internal */ @@ -103,3 +111,90 @@ function mergeLlmChoices( })) ?? []) ] as LlmChoice[]; } + +/** + * @internal + */ +export function validateResponse( + response: OrchestrationStreamResponse +): void { + if (response._openStream) { + throw new Error( + "Stream wasn't closed properly. Please ensure the stream is closed after processing." + ); + } + + validateLlmModuleResult(response._data.module_results?.llm); + + validateLlmModuleResult(response._data.orchestration_result); + + validateChoices(response._data.module_results?.output_unmasking); +} + +function validateLlmModuleResult( + llmModuleResult: LlmModuleResult | undefined +): void { + if (llmModuleResult) { + if (!llmModuleResult.usage) { + logger.warn('LlmModuleResult is missing usage information.'); + } + if (!llmModuleResult.choices || llmModuleResult.choices.length === 0) { + logger.warn('LlmModuleResult must contain at least one choice.'); + } + + validateChoices(llmModuleResult.choices); + } +} + +function validateChoices(choices: LlmChoice[] | undefined): void { + if (choices) { + for (const choice of choices) { + if (!choice.message) { + logger.warn('LlmChoice is missing message information.'); + } + if (!choice.finish_reason) { + logger.warn('LlmChoice is missing finish reason.'); + } + if (!choice.index && choice.index !== 0) { + logger.warn('LlmChoice must have a valid index.'); + } + validateMessage(choice.message); + } + } +} + +function validateMessage(message: ResponseChatMessage): void { + if (!message.role) { + logger.warn('Message is missing role information.'); + } + if (!message.content && !message.tool_calls) { + logger.warn('Message contains neither content nor tool calls.'); + } + + if (message.tool_calls) { + for (const toolCall of message.tool_calls) { + validateToolCall(toolCall); + } + } +} + +/** + * @internal + */ +export function validateToolCall(toolCall: Partial): void { + if (typeof toolCall.id !== 'string') { + logger.warn('ToolCall is missing id information.'); + } + if (typeof toolCall.function?.name !== 'string') { + logger.warn('ToolCall is missing function name information.'); + } + if (typeof toolCall.function?.arguments !== 'string') { + logger.warn('ToolCall is missing function arguments information.'); + } + + try { + JSON.parse(toolCall.function?.arguments ?? ''); + } catch { + logger.warn('ToolCall arguments are not valid JSON.'); + } +} diff --git a/packages/orchestration/src/util/tool-calls.ts b/packages/orchestration/src/util/tool-calls.ts deleted file mode 100644 index 1129b0cff..000000000 --- a/packages/orchestration/src/util/tool-calls.ts +++ /dev/null @@ -1,78 +0,0 @@ -import type { - MessageToolCall, - ToolCallChunk -} from '../client/api/schema/index.js'; - -/** - * @internal - */ -export type ToolCallAccumulator = { - id?: string; - type: 'function'; - function: { - name?: string; - arguments?: string; - } & Record; -} & Record; - -/** - * @internal - * Check if the accumulator is a MessageToolCall. - */ -export function isMessageToolCall( - acc: ToolCallAccumulator -): acc is MessageToolCall { - return ( - typeof acc.id === 'string' && - typeof acc.function.name === 'string' && - typeof acc.function.arguments === 'string' - ); -} - -/** - * Merge a stream of ToolCallChunk into a single MessageToolCall. - * @throws If the final object is missing required fields. - * @internal - */ -export function mergeToolCallChunk( - chunk: ToolCallChunk, - acc?: ToolCallAccumulator -): ToolCallAccumulator { - const accumulator: ToolCallAccumulator = acc - ? { ...acc } - : { - type: 'function', - function: {} - }; - - if (chunk.id) { - accumulator.id = chunk.id; - } - - // Merge any extra top‐level props - for (const key of Object.keys(chunk)) { - if (!['index', 'id', 'type', 'function'].includes(key)) { - accumulator[key] = chunk[key]; - } - } - - if (chunk.function) { - if (chunk.function.name) { - accumulator.function.name = chunk.function.name; - } - - if (chunk.function.arguments) { - accumulator.function.arguments = - (accumulator.function.arguments || '') + chunk.function.arguments; - } - - // Merge any extra function‐scoped fields - for (const key of Object.keys(chunk.function)) { - if (!['name', 'arguments'].includes(key)) { - accumulator.function[key] = (chunk.function as any)[key]; - } - } - } - - return accumulator; -} From 8131fb059a167ac238785527dbddfb89bf0b52ab Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 13:01:39 +0200 Subject: [PATCH 12/43] create baseline --- packages/orchestration/src/util/stream.ts | 64 ++++++++++++++++++----- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index a09964e89..595320908 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -10,7 +10,8 @@ import type { ModuleResultsStreaming, OrchestrationStreamChunkResponse, OrchestrationStreamResponse, - ResponseChatMessage + ResponseChatMessage, + ToolCallChunk } from '../index.js'; const logger = createLogger({ @@ -98,18 +99,53 @@ function mergeLlmChoices( existing: LlmChoice[] | undefined, incoming: LlmChoiceStreaming[] | undefined ): LlmChoice[] { - return [ - ...(existing ?? []), - ...(incoming?.map(choice => ({ - ...choice, - message: choice.message - ? { - ...choice.message, - content: choice.message.content - } - : undefined - })) ?? []) - ] as LlmChoice[]; + const mergedChoices = [...existing ?? []]; + for(const choice of incoming ?? []) { + const existingChoice = mergedChoices.find(c => c.index === choice.index); + if (existingChoice) { + // Merge existing choice with incoming choice + existingChoice.message = { + ...existingChoice.message, + ...choice.message + }; + } else { + // Add new choice + mergedChoices.push(transforStreamingChoice(choice)); + } + } + return mergedChoices; +} + +function transforStreamingChoice( + choice: LlmChoiceStreaming +): LlmChoice { + return { + index: choice.index, + message: { + role: 'assistant', + content: choice.delta.content, + tool_calls: transformStreamingToolCalls(choice.delta.tool_calls), + refusal: choice.delta.refusal, + }, + finish_reason: choice.finish_reason ?? '', + logprobs: choice.logprobs + }; +} + +function transformStreamingToolCalls( + toolCalls: ToolCallChunk[] | undefined +): MessageToolCall[] | undefined { + if(!toolCalls || toolCalls.length === 0) { + return undefined; + } + return toolCalls?.map(toolCall => ({ + id: toolCall.id ?? '', + type: toolCall.type ?? 'function', + function: { + name: toolCall.function?.name ?? '', + arguments: toolCall.function?.arguments ?? '' + } + })); } /** @@ -153,7 +189,7 @@ function validateChoices(choices: LlmChoice[] | undefined): void { logger.warn('LlmChoice is missing message information.'); } if (!choice.finish_reason) { - logger.warn('LlmChoice is missing finish reason.'); + logger.warn('LlmChoice is missing a finish reason.'); } if (!choice.index && choice.index !== 0) { logger.warn('LlmChoice must have a valid index.'); From e2193e8570699671037dd5a756749c6206e8b54b Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 13:13:58 +0200 Subject: [PATCH 13/43] progress --- packages/orchestration/src/util/stream.ts | 48 ++++++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 595320908..63051a351 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -1,5 +1,7 @@ import { createLogger } from '@sap-cloud-sdk/util'; import type { + ChatDelta, + ChoiceLogprobs, CompletionPostResponseStreaming, LlmChoice, LlmChoiceStreaming, @@ -104,19 +106,53 @@ function mergeLlmChoices( const existingChoice = mergedChoices.find(c => c.index === choice.index); if (existingChoice) { // Merge existing choice with incoming choice - existingChoice.message = { - ...existingChoice.message, - ...choice.message - }; + existingChoice.finish_reason = choice.finish_reason ?? existingChoice.finish_reason; + existingChoice.logprobs = mergeLogProbs(existingChoice.logprobs, choice.logprobs); + existingChoice.index = choice.index ?? existingChoice.index; + existingChoice.message = mergeMessage(existingChoice.message, choice.delta); } else { // Add new choice - mergedChoices.push(transforStreamingChoice(choice)); + mergedChoices.push(transformStreamingChoice(choice)); } } return mergedChoices; } -function transforStreamingChoice( +function mergeMessage( + existing: ResponseChatMessage | undefined, + incoming: ChatDelta | undefined +): ResponseChatMessage { + if (!incoming) { + return existing; + } + if (!existing) { + return incoming; + } + return { + role: incoming.role ?? existing.role, + content: [...(existing.content ?? []), ...(incoming.content ?? [])], + tool_calls: [...(existing.tool_calls ?? []), ...(incoming.tool_calls ?? [])], + refusal: [...(existing.refusal ?? []), ...(incoming.refusal ?? [])], + }; +} + +function mergeLogProbs( + existing: ChoiceLogprobs | undefined, + incoming: ChoiceLogprobs | undefined +): ChoiceLogprobs | undefined { + if (!incoming) { + return existing; + } + if(!existing) { + return incoming; + } + return { + content: [...(existing.content ?? []), ...(incoming.content ?? [])], + refusal: [...(existing.refusal ?? []), ...(incoming.refusal ?? [])], + }; +} + +function transformStreamingChoice( choice: LlmChoiceStreaming ): LlmChoice { return { From 670ae4802bb757ac86479f1c31151c765762c7d0 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 13:20:23 +0200 Subject: [PATCH 14/43] lint --- packages/orchestration/src/util/stream.ts | 43 +++++++++++++++-------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 63051a351..6cae464d7 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -101,15 +101,22 @@ function mergeLlmChoices( existing: LlmChoice[] | undefined, incoming: LlmChoiceStreaming[] | undefined ): LlmChoice[] { - const mergedChoices = [...existing ?? []]; - for(const choice of incoming ?? []) { + const mergedChoices = [...(existing ?? [])]; + for (const choice of incoming ?? []) { const existingChoice = mergedChoices.find(c => c.index === choice.index); if (existingChoice) { // Merge existing choice with incoming choice - existingChoice.finish_reason = choice.finish_reason ?? existingChoice.finish_reason; - existingChoice.logprobs = mergeLogProbs(existingChoice.logprobs, choice.logprobs); existingChoice.index = choice.index ?? existingChoice.index; - existingChoice.message = mergeMessage(existingChoice.message, choice.delta); + existingChoice.finish_reason = + choice.finish_reason ?? existingChoice.finish_reason; + existingChoice.logprobs = mergeLogProbs( + existingChoice.logprobs, + choice.logprobs + ); + existingChoice.message = mergeMessage( + existingChoice.message, + choice.delta + ); } else { // Add new choice mergedChoices.push(transformStreamingChoice(choice)); @@ -131,8 +138,11 @@ function mergeMessage( return { role: incoming.role ?? existing.role, content: [...(existing.content ?? []), ...(incoming.content ?? [])], - tool_calls: [...(existing.tool_calls ?? []), ...(incoming.tool_calls ?? [])], - refusal: [...(existing.refusal ?? []), ...(incoming.refusal ?? [])], + tool_calls: [ + ...(existing.tool_calls ?? []), + ...(incoming.tool_calls ?? []) + ], + refusal: [...(existing.refusal ?? []), ...(incoming.refusal ?? [])] }; } @@ -143,25 +153,23 @@ function mergeLogProbs( if (!incoming) { return existing; } - if(!existing) { + if (!existing) { return incoming; } return { content: [...(existing.content ?? []), ...(incoming.content ?? [])], - refusal: [...(existing.refusal ?? []), ...(incoming.refusal ?? [])], + refusal: [...(existing.refusal ?? []), ...(incoming.refusal ?? [])] }; } -function transformStreamingChoice( - choice: LlmChoiceStreaming -): LlmChoice { +function transformStreamingChoice(choice: LlmChoiceStreaming): LlmChoice { return { index: choice.index, message: { role: 'assistant', content: choice.delta.content, tool_calls: transformStreamingToolCalls(choice.delta.tool_calls), - refusal: choice.delta.refusal, + refusal: choice.delta.refusal }, finish_reason: choice.finish_reason ?? '', logprobs: choice.logprobs @@ -171,7 +179,7 @@ function transformStreamingChoice( function transformStreamingToolCalls( toolCalls: ToolCallChunk[] | undefined ): MessageToolCall[] | undefined { - if(!toolCalls || toolCalls.length === 0) { + if (!toolCalls || toolCalls.length === 0) { return undefined; } return toolCalls?.map(toolCall => ({ @@ -267,6 +275,11 @@ export function validateToolCall(toolCall: Partial): void { try { JSON.parse(toolCall.function?.arguments ?? ''); } catch { - logger.warn('ToolCall arguments are not valid JSON.'); + logger.warn( + 'ToolCall arguments are not valid JSON for tool: ' + + toolCall.function?.name || + toolCall.id || + 'unknown' + ); } } From 4b9b325f297dd7083379d16724e1840c2a74c744 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 13:47:40 +0200 Subject: [PATCH 15/43] draft --- packages/orchestration/src/util/stream.ts | 57 +++++++++++++++++------ 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 6cae464d7..e044862b4 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -126,26 +126,56 @@ function mergeLlmChoices( } function mergeMessage( - existing: ResponseChatMessage | undefined, + existing: ResponseChatMessage, incoming: ChatDelta | undefined ): ResponseChatMessage { if (!incoming) { return existing; } - if (!existing) { - return incoming; - } return { - role: incoming.role ?? existing.role, - content: [...(existing.content ?? []), ...(incoming.content ?? [])], - tool_calls: [ - ...(existing.tool_calls ?? []), - ...(incoming.tool_calls ?? []) - ], - refusal: [...(existing.refusal ?? []), ...(incoming.refusal ?? [])] + role: existing.role, + content: existing.content + (incoming.content ?? ''), + tool_calls: mergeToolCalls(existing.tool_calls, incoming.tool_calls), + refusal: incoming.refusal ?? existing.refusal }; } +function mergeToolCalls( + existing: MessageToolCall[] | undefined, + incoming: ToolCallChunk[] | undefined +): MessageToolCall[] | undefined { + if (!incoming || incoming.length === 0) { + return existing; + } + if (!existing || existing.length === 0) { + return transformStreamingToolCalls(incoming); + } + const mergedToolCalls = [...(existing ?? [])]; + for (const toolCall of incoming) { + const existingToolCall = mergedToolCalls.find( + tc => tc.id === toolCall.id + ); + if (existingToolCall) { + // Merge existing tool call with incoming tool call + existingToolCall.function.name = + toolCall.function?.name ?? existingToolCall.function.name; + existingToolCall.function.arguments = + toolCall.function?.arguments ?? existingToolCall.function.arguments; + } else { + // Add new tool call + mergedToolCalls.push({ + id: toolCall.id ?? '', + type: toolCall.type ?? 'function', + function: { + name: toolCall.function?.name ?? '', + arguments: toolCall.function?.arguments ?? '' + } + }); + } + } + return mergedToolCalls.length > 0 ? mergedToolCalls : undefined; +} + function mergeLogProbs( existing: ChoiceLogprobs | undefined, incoming: ChoiceLogprobs | undefined @@ -258,10 +288,7 @@ function validateMessage(message: ResponseChatMessage): void { } } -/** - * @internal - */ -export function validateToolCall(toolCall: Partial): void { +function validateToolCall(toolCall: Partial): void { if (typeof toolCall.id !== 'string') { logger.warn('ToolCall is missing id information.'); } From 614c10076ea93693f34082af9a2818dcafa3f9da Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 13:53:33 +0200 Subject: [PATCH 16/43] semi-final --- packages/orchestration/src/util/stream.ts | 26 +++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index e044862b4..370fb1d47 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -150,11 +150,12 @@ function mergeToolCalls( if (!existing || existing.length === 0) { return transformStreamingToolCalls(incoming); } - const mergedToolCalls = [...(existing ?? [])]; + const mergedToolCalls = [...existing]; for (const toolCall of incoming) { const existingToolCall = mergedToolCalls.find( tc => tc.id === toolCall.id ); + // TODO: THIS if (existingToolCall) { // Merge existing tool call with incoming tool call existingToolCall.function.name = @@ -163,17 +164,10 @@ function mergeToolCalls( toolCall.function?.arguments ?? existingToolCall.function.arguments; } else { // Add new tool call - mergedToolCalls.push({ - id: toolCall.id ?? '', - type: toolCall.type ?? 'function', - function: { - name: toolCall.function?.name ?? '', - arguments: toolCall.function?.arguments ?? '' - } - }); - } + mergedToolCalls.push(transformStreamingToolCall(toolCall)); + } } - return mergedToolCalls.length > 0 ? mergedToolCalls : undefined; + return mergedToolCalls; } function mergeLogProbs( @@ -212,14 +206,20 @@ function transformStreamingToolCalls( if (!toolCalls || toolCalls.length === 0) { return undefined; } - return toolCalls?.map(toolCall => ({ + return toolCalls?.map(toolCall => transformStreamingToolCall(toolCall)); +} + +function transformStreamingToolCall( + toolCall: ToolCallChunk +): MessageToolCall { + return { id: toolCall.id ?? '', type: toolCall.type ?? 'function', function: { name: toolCall.function?.name ?? '', arguments: toolCall.function?.arguments ?? '' } - })); + }; } /** From e4372fc691026a92b30aab82a8049233bf3ff21d Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 14:02:12 +0200 Subject: [PATCH 17/43] merge function complete --- packages/orchestration/src/util/stream.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 370fb1d47..65a020d6d 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -155,16 +155,16 @@ function mergeToolCalls( const existingToolCall = mergedToolCalls.find( tc => tc.id === toolCall.id ); - // TODO: THIS if (existingToolCall) { // Merge existing tool call with incoming tool call + existingToolCall.id = toolCall.id ?? existingToolCall.id; existingToolCall.function.name = toolCall.function?.name ?? existingToolCall.function.name; existingToolCall.function.arguments = - toolCall.function?.arguments ?? existingToolCall.function.arguments; + existingToolCall.function.arguments + (toolCall.function?.arguments ?? ''); } else { // Add new tool call - mergedToolCalls.push(transformStreamingToolCall(toolCall)); + mergedToolCalls.push(transformStreamingToolCall(toolCall)); } } return mergedToolCalls; From e3fe4c1aa5b4c1e7cd9587743d26d06518e9d40a Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 14:12:15 +0200 Subject: [PATCH 18/43] add finish reason handler --- .../src/orchestration-stream-response.ts | 1 - packages/orchestration/src/util/stream.ts | 56 +++++++++++++++---- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index 84405df3f..76f9e4776 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -57,7 +57,6 @@ export class OrchestrationStreamResponse { * @param choiceIndex - The index of the choice to parse. * @returns The message content. */ - // need to check for content filter hits in the post processing public getContent(choiceIndex = 0): string | undefined { if (!this._openStream) { const choice = this.findChoiceByIndex(choiceIndex); diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 65a020d6d..02de36975 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -107,8 +107,11 @@ function mergeLlmChoices( if (existingChoice) { // Merge existing choice with incoming choice existingChoice.index = choice.index ?? existingChoice.index; - existingChoice.finish_reason = - choice.finish_reason ?? existingChoice.finish_reason; + existingChoice.finish_reason = handleFinishReason( + existingChoice.finish_reason, + choice.finish_reason, + choice.index + ); existingChoice.logprobs = mergeLogProbs( existingChoice.logprobs, choice.logprobs @@ -152,20 +155,19 @@ function mergeToolCalls( } const mergedToolCalls = [...existing]; for (const toolCall of incoming) { - const existingToolCall = mergedToolCalls.find( - tc => tc.id === toolCall.id - ); + const existingToolCall = mergedToolCalls.find(tc => tc.id === toolCall.id); if (existingToolCall) { // Merge existing tool call with incoming tool call existingToolCall.id = toolCall.id ?? existingToolCall.id; existingToolCall.function.name = toolCall.function?.name ?? existingToolCall.function.name; existingToolCall.function.arguments = - existingToolCall.function.arguments + (toolCall.function?.arguments ?? ''); + existingToolCall.function.arguments + + (toolCall.function?.arguments ?? ''); } else { // Add new tool call - mergedToolCalls.push(transformStreamingToolCall(toolCall)); - } + mergedToolCalls.push(transformStreamingToolCall(toolCall)); + } } return mergedToolCalls; } @@ -186,6 +188,40 @@ function mergeLogProbs( }; } +function handleFinishReason( + existing: string | undefined, + incoming: string | undefined, + choiceIndex: number | undefined +): string { + if (!incoming) { + return existing ?? ''; + } + + switch (incoming) { + case 'content_filter': + logger.error( + `Choice ${choiceIndex}: Stream finished with content filter hit.` + ); + break; + case 'length': + logger.error( + `Choice ${choiceIndex}: Stream finished with token length exceeded.` + ); + break; + case 'stop': + case 'tool_calls': + case 'function_call': + logger.debug(`Choice ${choiceIndex}: Stream finished.`); + break; + default: + logger.error( + `Choice ${choiceIndex}: Stream finished with unknown reason '${incoming}'.` + ); + } + + return incoming; +} + function transformStreamingChoice(choice: LlmChoiceStreaming): LlmChoice { return { index: choice.index, @@ -209,9 +245,7 @@ function transformStreamingToolCalls( return toolCalls?.map(toolCall => transformStreamingToolCall(toolCall)); } -function transformStreamingToolCall( - toolCall: ToolCallChunk -): MessageToolCall { +function transformStreamingToolCall(toolCall: ToolCallChunk): MessageToolCall { return { id: toolCall.id ?? '', type: toolCall.type ?? 'function', From 381fd5b6e65376414e2c206a83b060286c1e6324 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 14:44:39 +0200 Subject: [PATCH 19/43] clean-up --- packages/orchestration/src/util/stream.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 02de36975..26daad7fa 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -158,7 +158,6 @@ function mergeToolCalls( const existingToolCall = mergedToolCalls.find(tc => tc.id === toolCall.id); if (existingToolCall) { // Merge existing tool call with incoming tool call - existingToolCall.id = toolCall.id ?? existingToolCall.id; existingToolCall.function.name = toolCall.function?.name ?? existingToolCall.function.name; existingToolCall.function.arguments = From ad5678b5a58e73499d59e499c96fa79d131a4668 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 10 Jul 2025 14:48:54 +0200 Subject: [PATCH 20/43] adjust validation types --- packages/orchestration/src/util/stream.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 26daad7fa..a4dafc80b 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -275,7 +275,7 @@ export function validateResponse( } function validateLlmModuleResult( - llmModuleResult: LlmModuleResult | undefined + llmModuleResult: Partial | undefined ): void { if (llmModuleResult) { if (!llmModuleResult.usage) { @@ -289,11 +289,13 @@ function validateLlmModuleResult( } } -function validateChoices(choices: LlmChoice[] | undefined): void { +function validateChoices(choices: Partial[] | undefined): void { if (choices) { for (const choice of choices) { if (!choice.message) { logger.warn('LlmChoice is missing message information.'); + } else { + validateMessage(choice.message); } if (!choice.finish_reason) { logger.warn('LlmChoice is missing a finish reason.'); @@ -301,12 +303,11 @@ function validateChoices(choices: LlmChoice[] | undefined): void { if (!choice.index && choice.index !== 0) { logger.warn('LlmChoice must have a valid index.'); } - validateMessage(choice.message); } } } -function validateMessage(message: ResponseChatMessage): void { +function validateMessage(message: Partial): void { if (!message.role) { logger.warn('Message is missing role information.'); } From c4835644fc762a64a4c7b79216738bf4f45ea612 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 14 Jul 2025 14:05:43 +0200 Subject: [PATCH 21/43] lint --- packages/orchestration/src/util/stream.ts | 5 ++++- tests/e2e-tests/src/orchestration.test.ts | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index a4dafc80b..0fa578fbb 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -155,7 +155,9 @@ function mergeToolCalls( } const mergedToolCalls = [...existing]; for (const toolCall of incoming) { - const existingToolCall = mergedToolCalls.find(tc => tc.id === toolCall.id); + const existingToolCall = mergedToolCalls.find( + tc => tc.index === toolCall.index + ); if (existingToolCall) { // Merge existing tool call with incoming tool call existingToolCall.function.name = @@ -246,6 +248,7 @@ function transformStreamingToolCalls( function transformStreamingToolCall(toolCall: ToolCallChunk): MessageToolCall { return { + index: toolCall.index, id: toolCall.id ?? '', type: toolCall.type ?? 'function', function: { diff --git a/tests/e2e-tests/src/orchestration.test.ts b/tests/e2e-tests/src/orchestration.test.ts index 76d588f3d..d37d7e12a 100644 --- a/tests/e2e-tests/src/orchestration.test.ts +++ b/tests/e2e-tests/src/orchestration.test.ts @@ -197,6 +197,7 @@ describe('orchestration', () => { "name": "add", }, "id": "mock_id", + "index": 0, "type": "function", }, { @@ -205,6 +206,7 @@ describe('orchestration', () => { "name": "add", }, "id": "mock_id", + "index": 1, "type": "function", }, ] From 7c0dd77c4692028d9f41ebfb577853d495cf6124 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 14 Jul 2025 14:15:03 +0200 Subject: [PATCH 22/43] add index --- packages/orchestration/src/orchestration-client.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/orchestration/src/orchestration-client.test.ts b/packages/orchestration/src/orchestration-client.test.ts index 354e22ca1..648e258a7 100644 --- a/packages/orchestration/src/orchestration-client.test.ts +++ b/packages/orchestration/src/orchestration-client.test.ts @@ -842,6 +842,7 @@ describe('orchestration service client', () => { "name": "add", }, "id": "call_HPgxxSmD2ctYfcJ3gp1JBc7i", + "index": 0, "type": "function", }, { @@ -850,6 +851,7 @@ describe('orchestration service client', () => { "name": "multiply", }, "id": "call_PExve0Dd9hxD8hOk4Uhr1yhO", + "index": 1, "type": "function", }, ] From b8a23e450ef863f8b3ff96cb67c8eabb5d08e8b8 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 14 Jul 2025 14:30:54 +0200 Subject: [PATCH 23/43] update test --- packages/orchestration/src/orchestration-stream.test.ts | 8 ++++---- packages/orchestration/src/util/stream.ts | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream.test.ts b/packages/orchestration/src/orchestration-stream.test.ts index 870877dac..a37f6a627 100644 --- a/packages/orchestration/src/orchestration-stream.test.ts +++ b/packages/orchestration/src/orchestration-stream.test.ts @@ -53,13 +53,13 @@ describe('Orchestration chat completion stream', () => { it('should process the finish reasons', async () => { const logger = createLogger({ package: 'orchestration', - messageContext: 'orchestration-chat-completion-stream' + messageContext: 'stream-util' }); const debugSpy = jest.spyOn(logger, 'debug'); const asyncGeneratorChunk = OrchestrationStream._processChunk( originalChatCompletionStream ); - const asyncGeneratorFinishReason = OrchestrationStream._processFinishReason( + const asyncGeneratorFinishReason = OrchestrationStream._processOrchestrationStreamChunkResponse( new OrchestrationStream(() => asyncGeneratorChunk, new AbortController()), new OrchestrationStreamResponse() ); @@ -73,13 +73,13 @@ describe('Orchestration chat completion stream', () => { it('should process the token usage', async () => { const logger = createLogger({ package: 'orchestration', - messageContext: 'orchestration-chat-completion-stream' + messageContext: 'stream-util' }); const debugSpy = jest.spyOn(logger, 'debug'); const asyncGeneratorChunk = OrchestrationStream._processChunk( originalChatCompletionStream ); - const asyncGeneratorTokenUsage = OrchestrationStream._processTokenUsage( + const asyncGeneratorTokenUsage = OrchestrationStream._processOrchestrationStreamChunkResponse( new OrchestrationStream(() => asyncGeneratorChunk, new AbortController()), new OrchestrationStreamResponse() ); diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 0fa578fbb..5aab9c969 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -89,6 +89,9 @@ function mergeTokenUsage( | { prompt_tokens: number; completion_tokens: number; total_tokens: number } | undefined ): { prompt_tokens: number; completion_tokens: number; total_tokens: number } { + if(incoming) { + logger.debug(`Token usage: ${JSON.stringify(incoming)}`); + } return { prompt_tokens: incoming?.prompt_tokens ?? existing?.prompt_tokens ?? 0, completion_tokens: From 137382bae0f96624e9a37875262f2222a0cb33cf Mon Sep 17 00:00:00 2001 From: cloud-sdk-js Date: Mon, 14 Jul 2025 12:33:04 +0000 Subject: [PATCH 24/43] fix: Changes from lint --- .../src/orchestration-stream.test.ts | 24 ++++++++++++------- packages/orchestration/src/util/stream.ts | 2 +- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream.test.ts b/packages/orchestration/src/orchestration-stream.test.ts index a37f6a627..6bf4b3372 100644 --- a/packages/orchestration/src/orchestration-stream.test.ts +++ b/packages/orchestration/src/orchestration-stream.test.ts @@ -59,10 +59,14 @@ describe('Orchestration chat completion stream', () => { const asyncGeneratorChunk = OrchestrationStream._processChunk( originalChatCompletionStream ); - const asyncGeneratorFinishReason = OrchestrationStream._processOrchestrationStreamChunkResponse( - new OrchestrationStream(() => asyncGeneratorChunk, new AbortController()), - new OrchestrationStreamResponse() - ); + const asyncGeneratorFinishReason = + OrchestrationStream._processOrchestrationStreamChunkResponse( + new OrchestrationStream( + () => asyncGeneratorChunk, + new AbortController() + ), + new OrchestrationStreamResponse() + ); for await (const chunk of asyncGeneratorFinishReason) { expect(chunk).toBeDefined(); @@ -79,10 +83,14 @@ describe('Orchestration chat completion stream', () => { const asyncGeneratorChunk = OrchestrationStream._processChunk( originalChatCompletionStream ); - const asyncGeneratorTokenUsage = OrchestrationStream._processOrchestrationStreamChunkResponse( - new OrchestrationStream(() => asyncGeneratorChunk, new AbortController()), - new OrchestrationStreamResponse() - ); + const asyncGeneratorTokenUsage = + OrchestrationStream._processOrchestrationStreamChunkResponse( + new OrchestrationStream( + () => asyncGeneratorChunk, + new AbortController() + ), + new OrchestrationStreamResponse() + ); for await (const chunk of asyncGeneratorTokenUsage) { expect(chunk).toBeDefined(); diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 5aab9c969..6fa422d04 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -89,7 +89,7 @@ function mergeTokenUsage( | { prompt_tokens: number; completion_tokens: number; total_tokens: number } | undefined ): { prompt_tokens: number; completion_tokens: number; total_tokens: number } { - if(incoming) { + if (incoming) { logger.debug(`Token usage: ${JSON.stringify(incoming)}`); } return { From 7d80530c19ddf6e42ff5b9185421e0bc25339618 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 14 Jul 2025 14:40:51 +0200 Subject: [PATCH 25/43] remove redundant assignment --- packages/orchestration/src/util/stream.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 5aab9c969..51f95255b 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -109,7 +109,6 @@ function mergeLlmChoices( const existingChoice = mergedChoices.find(c => c.index === choice.index); if (existingChoice) { // Merge existing choice with incoming choice - existingChoice.index = choice.index ?? existingChoice.index; existingChoice.finish_reason = handleFinishReason( existingChoice.finish_reason, choice.finish_reason, From 324cb2403ae031709276a4bb2120d9b44bf500a7 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Tue, 15 Jul 2025 16:05:32 +0200 Subject: [PATCH 26/43] add unit tests --- .../orchestration/src/orchestration-stream.ts | 2 +- .../orchestration/src/util/stream.test.ts | 711 ++++++++++++++++++ packages/orchestration/src/util/stream.ts | 4 +- 3 files changed, 714 insertions(+), 3 deletions(-) create mode 100644 packages/orchestration/src/util/stream.test.ts diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index 26a23db50..ebcd38adb 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -50,7 +50,7 @@ export class OrchestrationStream extends SseStream { ); } for await (const chunk of stream) { - mergeStreamResponse(chunk.data, response); + mergeStreamResponse(response, chunk.data); yield chunk; } } diff --git a/packages/orchestration/src/util/stream.test.ts b/packages/orchestration/src/util/stream.test.ts new file mode 100644 index 000000000..7d9e150fa --- /dev/null +++ b/packages/orchestration/src/util/stream.test.ts @@ -0,0 +1,711 @@ +import { createLogger } from '@sap-cloud-sdk/util'; +import { jest } from '@jest/globals'; +import { OrchestrationStreamResponse } from '../index.js'; +import { mergeStreamResponse, validateResponse } from './stream.js'; +import type { + CompletionPostResponseStreaming, + OrchestrationStreamChunkResponse +} from '../index.js'; + +const llmBase = { + id: 'orchestration-id-1', + object: 'chat.completion.chunk', + created: 1752575616, + model: 'gpt-4o-2024-08-06', + system_fingerprint: 'fp_ee1d74bde0', + usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 } +}; + +describe('stream-util', () => { + describe('mergeStreamResponse', () => { + it('merges basic stream response properties', () => { + const response = + new OrchestrationStreamResponse(); + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: { + ...llmBase, + choices: [], + usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 } + } + }; + + mergeStreamResponse(response, chunk); + + expect(response._data.request_id).toBe('test-request-123'); + expect(response._data.orchestration_result).toBeDefined(); + expect(response._data.orchestration_result?.usage).toEqual({ + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 15 + }); + }); + + it('merges module results with llm module', () => { + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + module_results: { + llm: { + ...llmBase, + usage: { + prompt_tokens: 20, + completion_tokens: 10, + total_tokens: 30 + }, + choices: [ + { + index: 0, + delta: { content: 'Hello' } + } + ] + } + } + }; + + const response = + new OrchestrationStreamResponse(); + + mergeStreamResponse(response, chunk); + + expect(response._data.module_results?.llm?.usage).toEqual({ + prompt_tokens: 20, + completion_tokens: 10, + total_tokens: 30 + }); + expect(response._data.module_results?.llm?.choices).toEqual([ + { + index: 0, + message: { + role: 'assistant', + content: 'Hello' + }, + finish_reason: '' + } + ]); + }); + + it('merges output_unmasking module results', () => { + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + module_results: { + output_unmasking: [ + { + index: 0, + delta: { content: 'Unmasked content' } + } + ] + } + }; + + const response = + new OrchestrationStreamResponse(); + + mergeStreamResponse(response, chunk); + + expect(response._data.module_results?.output_unmasking).toHaveLength(1); + expect( + response._data.module_results?.output_unmasking?.[0].message.content + ).toBe('Unmasked content'); + }); + }); + + describe('token usage merging', () => { + it('merges token usage with existing values', () => { + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + usage: { prompt_tokens: 15, completion_tokens: 8, total_tokens: 23 }, + choices: [] + } + }; + + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + usage: { prompt_tokens: 1, completion_tokens: 2, total_tokens: 3 }, + choices: [] + } + }; + + mergeStreamResponse(response, chunk); + + expect(response._data.orchestration_result?.usage).toEqual({ + prompt_tokens: 15, + completion_tokens: 8, + total_tokens: 23 + }); + }); + + it('handles missing token usage gracefully', () => { + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [] + } + }; + + delete chunk.orchestration_result?.usage; + + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }, + choices: [] + } + }; + + mergeStreamResponse(response, chunk); + + expect(response._data.orchestration_result?.usage).toEqual({ + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 15 + }); + }); + }); + + describe('choice merging', () => { + it('merges content from multiple chunks', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'Hello' + }, + finish_reason: '', + logprobs: { + content: [], + refusal: [] + } + } + ] + } + }; + + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + delta: { content: ' World' }, + finish_reason: 'stop' + } + ] + } + }; + + mergeStreamResponse(response, chunk); + + expect( + response._data.orchestration_result?.choices[0].message.content + ).toBe('Hello World'); + expect( + response._data.orchestration_result?.choices[0].finish_reason + ).toBe('stop'); + }); + + it('adds new choice when index does not exist', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'First choice' + }, + finish_reason: 'stop' + } + ] + } + }; + + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 1, + delta: { content: 'Second choice' }, + finish_reason: 'stop' + } + ] + } + }; + + mergeStreamResponse(response, chunk); + + expect(response._data.orchestration_result?.choices).toHaveLength(2); + expect( + response._data.orchestration_result?.choices[1].message.content + ).toBe('Second choice'); + }); + + it('handles finish reasons correctly', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'Test' + }, + finish_reason: '' + } + ] + } + }; + + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + delta: { content: '' }, + finish_reason: 'content_filter' + } + ] + } + }; + + mergeStreamResponse(response, chunk); + + expect( + response._data.orchestration_result?.choices[0].finish_reason + ).toBe('content_filter'); + }); + }); + + describe('tool call merging', () => { + it('merges tool call arguments from multiple chunks', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: '', + tool_calls: [ + { + index: 0, + id: 'tool-call-1', + type: 'function', + function: { + name: 'test_function', + arguments: '{"param1":' + } + } + ] + }, + finish_reason: '' + } + ] + } + }; + + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + delta: { + content: '', + tool_calls: [ + { + index: 0, + id: 'tool-call-1', + type: 'function', + function: { + name: 'test_function', + arguments: '"value1"}' + } + } + ] + } + } + ] + } + }; + + mergeStreamResponse(response, chunk); + + expect( + response._data.orchestration_result?.choices[0].message.tool_calls?.[0] + .function.arguments + ).toBe('{"param1":"value1"}'); + }); + + it('adds new tool call when index does not exist', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: '', + tool_calls: [ + { + index: 0, + id: 'tool-call-1', + type: 'function', + function: { + name: 'first_function', + arguments: '{"param1":"value1"}' + } + } + ], + refusal: undefined + }, + finish_reason: '', + logprobs: undefined + } + ] + } + }; + + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + delta: { + content: '', + tool_calls: [ + { + index: 1, + id: 'tool-call-2', + type: 'function', + function: { + name: 'second_function', + arguments: '{"param2":"value2"}' + } + } + ] + } + } + ] + } + }; + + mergeStreamResponse(response, chunk); + + expect( + response._data.orchestration_result?.choices[0].message.tool_calls + ).toHaveLength(2); + expect( + response._data.orchestration_result?.choices[0].message.tool_calls?.[1] + .function.name + ).toBe('second_function'); + }); + }); + + describe('logprobs merging', () => { + it('merges logprobs content arrays', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'Test' + }, + finish_reason: '', + logprobs: { + content: [{ token: 'Test', logprob: -0.1 }], + refusal: [] + } + } + ] + } + }; + + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + delta: { content: ' message' }, + logprobs: { + content: [{ token: ' message', logprob: -0.2 }], + refusal: [] + } + } + ] + } + }; + + mergeStreamResponse(response, chunk); + + expect( + response._data.orchestration_result?.choices[0]?.logprobs?.content + ).toHaveLength(2); + expect( + response._data.orchestration_result?.choices?.[0]?.logprobs + ?.content?.[1]?.token ?? '' + ).toBe(' message'); + }); + + it('handles missing logprobs gracefully', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'Test' + }, + finish_reason: '' + } + ] + } + }; + + const chunk: CompletionPostResponseStreaming = { + request_id: 'test-request-123', + orchestration_result: { + ...llmBase, + choices: [ + { + index: 0, + delta: { content: ' message' }, + logprobs: { + content: [{ token: ' message', logprob: -0.2 }], + refusal: [] + } + } + ] + } + }; + + mergeStreamResponse(response, chunk); + + expect( + response._data.orchestration_result?.choices[0].logprobs?.content + ).toHaveLength(1); + expect( + response._data.orchestration_result?.choices[0].logprobs?.content?.[0] + ?.token ?? '' + ).toBe(' message'); + }); + }); + + describe('validateResponse', () => { + it('throws error when stream is still open', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: undefined + }; + + expect(() => validateResponse(response)).toThrow( + "Stream wasn't closed properly. Please ensure the stream is closed after processing." + ); + }); + + it('validates successfully with proper response structure', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + module_results: { + llm: { + ...llmBase, + usage: { + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 15 + }, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'Test message', + tool_calls: undefined, + refusal: undefined + }, + finish_reason: 'stop', + logprobs: undefined + } + ] + } + }, + orchestration_result: { + ...llmBase, + usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'Test message', + tool_calls: undefined, + refusal: undefined + }, + finish_reason: 'stop', + logprobs: undefined + } + ] + } + }; + + response._openStream = false; + + expect(() => validateResponse(response)).not.toThrow(); + }); + + it('validates tool calls with proper JSON arguments', () => { + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: { + ...llmBase, + usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: '', + tool_calls: [ + { + index: 0, + id: 'tool-call-1', + type: 'function', + function: { + name: 'test_function', + arguments: '{"param1":"value1"}' + } + } + ], + refusal: undefined + }, + finish_reason: 'tool_calls', + logprobs: undefined + } + ] + } + }; + + response._openStream = false; + + expect(() => validateResponse(response)).not.toThrow(); + }); + + it('logs warning for invalid tool call arguments', () => { + const logger = createLogger({ + package: 'orchestration', + messageContext: 'stream-util' + }); + const warnSpy = jest.spyOn(logger, 'warn'); + + const response = + new OrchestrationStreamResponse(); + response._data = { + request_id: 'test-request-123', + module_results: {}, + orchestration_result: { + ...llmBase, + usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: '', + tool_calls: [ + { + index: 0, + id: 'tool-call-1', + type: 'function', + function: { + name: 'test_function', + arguments: '{"param1":invalid_json}' + } + } + ], + refusal: undefined + }, + finish_reason: 'tool_calls', + logprobs: undefined + } + ] + } + }; + + response._openStream = false; + + validateResponse(response); + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining( + 'ToolCall arguments are not valid JSON for tool: test_function' + ) + ); + }); + }); +}); diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 9b881411b..6ed74577b 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -25,8 +25,8 @@ const logger = createLogger({ * @internal */ export function mergeStreamResponse( - chunk: CompletionPostResponseStreaming, - response: OrchestrationStreamResponse + response: OrchestrationStreamResponse, + chunk: CompletionPostResponseStreaming ): void { const data = response._data; data.request_id = chunk.request_id; From 4151545eba33039e67d0973598c5eb0e42af6a82 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Wed, 16 Jul 2025 09:31:45 +0200 Subject: [PATCH 27/43] review --- .../src/orchestration-stream-response.ts | 74 ++++++++++--------- .../orchestration/src/util/stream.test.ts | 41 ++++------ 2 files changed, 51 insertions(+), 64 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index 76f9e4776..f932cc536 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -14,12 +14,6 @@ const logger = createLogger({ messageContext: 'orchestration-stream-response' }); -function openStreamWarning(missingData: string): void { - logger.warn( - `The stream is still open, the ${missingData} data is not available yet. Please wait until the stream is closed.` - ); -} - /** * Orchestration stream response. */ @@ -33,10 +27,10 @@ export class OrchestrationStreamResponse { * @returns The token usage for the response. */ public getTokenUsage(): TokenUsage | undefined { - if (!this._openStream) { - return this._data.orchestration_result?.usage; + if (this.isStreamOpen()) { + return; } - openStreamWarning('token usage'); + return this._data.orchestration_result?.usage; } /** @@ -45,10 +39,10 @@ export class OrchestrationStreamResponse { * @returns The finish reason for the specified choice index. */ public getFinishReason(choiceIndex = 0): string | undefined { - if (!this._openStream) { - return this.findChoiceByIndex(choiceIndex)?.finish_reason; + if (this.isStreamOpen()) { + return; } - openStreamWarning('finish reason'); + return this.findChoiceByIndex(choiceIndex)?.finish_reason; } /** @@ -58,11 +52,11 @@ export class OrchestrationStreamResponse { * @returns The message content. */ public getContent(choiceIndex = 0): string | undefined { - if (!this._openStream) { - const choice = this.findChoiceByIndex(choiceIndex); - return choice?.message?.content; + if (this.isStreamOpen()) { + return; } - openStreamWarning('content'); + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.content; } /** @@ -71,11 +65,11 @@ export class OrchestrationStreamResponse { * @returns The message tool calls. */ public getToolCalls(choiceIndex = 0): MessageToolCalls | undefined { - if (!this._openStream) { - const choice = this.findChoiceByIndex(choiceIndex); - return choice?.message?.tool_calls; + if (this.isStreamOpen()) { + return; } - openStreamWarning('tool calls'); + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.tool_calls; } /** @@ -84,11 +78,11 @@ export class OrchestrationStreamResponse { * @returns The refusal string. */ public getRefusal(choiceIndex = 0): string | undefined { - if (!this._openStream) { - const choice = this.findChoiceByIndex(choiceIndex); - return choice?.message?.refusal; + if (this.isStreamOpen()) { + return; } - openStreamWarning('refusal message'); + const choice = this.findChoiceByIndex(choiceIndex); + return choice?.message?.refusal; } /** @@ -97,13 +91,12 @@ export class OrchestrationStreamResponse { * @returns A list of all messages. */ public getAllMessages(choiceIndex = 0): ChatMessages | undefined { - if (!this._openStream) { - const messages: ChatMessage[] = - this._data.module_results?.templating ?? []; - const content = this.findChoiceByIndex(choiceIndex)?.message; - return content ? [...messages, content] : messages; + if (this.isStreamOpen()) { + return; } - openStreamWarning('messages'); + const messages: ChatMessage[] = this._data.module_results?.templating ?? []; + const content = this.findChoiceByIndex(choiceIndex)?.message; + return content ? [...messages, content] : messages; } /** @@ -115,17 +108,17 @@ export class OrchestrationStreamResponse { public getAssistantMessage( choiceIndex = 0 ): AssistantChatMessage | undefined { - if (!this._openStream) { - return this.findChoiceByIndex(choiceIndex)?.message; + if (this.isStreamOpen()) { + return; } - openStreamWarning('assistant message'); + return this.findChoiceByIndex(choiceIndex)?.message; } public getResponse(): CompletionPostResponse | undefined { - if (!this._openStream) { - return this._data as CompletionPostResponse; + if (this.isStreamOpen()) { + return; } - openStreamWarning('response'); + return this._data as CompletionPostResponse; } get stream(): OrchestrationStream { @@ -143,6 +136,15 @@ export class OrchestrationStreamResponse { return this.getChoices().find((c: { index: number }) => c.index === index); } + private isStreamOpen(): boolean { + if (this._openStream) { + logger.warn( + 'The stream is still open, the requested data is not available yet. Please wait until the stream is closed.' + ); + } + return this._openStream; + } + /** * @internal */ diff --git a/packages/orchestration/src/util/stream.test.ts b/packages/orchestration/src/util/stream.test.ts index 7d9e150fa..22c3aaf48 100644 --- a/packages/orchestration/src/util/stream.test.ts +++ b/packages/orchestration/src/util/stream.test.ts @@ -396,11 +396,9 @@ describe('stream-util', () => { arguments: '{"param1":"value1"}' } } - ], - refusal: undefined + ] }, - finish_reason: '', - logprobs: undefined + finish_reason: '' } ] } @@ -462,8 +460,7 @@ describe('stream-util', () => { }, finish_reason: '', logprobs: { - content: [{ token: 'Test', logprob: -0.1 }], - refusal: [] + content: [{ token: 'Test', logprob: -0.1 }] } } ] @@ -480,8 +477,7 @@ describe('stream-util', () => { index: 0, delta: { content: ' message' }, logprobs: { - content: [{ token: ' message', logprob: -0.2 }], - refusal: [] + content: [{ token: ' message', logprob: -0.2 }] } } ] @@ -529,8 +525,7 @@ describe('stream-util', () => { index: 0, delta: { content: ' message' }, logprobs: { - content: [{ token: ' message', logprob: -0.2 }], - refusal: [] + content: [{ token: ' message', logprob: -0.2 }] } } ] @@ -582,12 +577,9 @@ describe('stream-util', () => { index: 0, message: { role: 'assistant', - content: 'Test message', - tool_calls: undefined, - refusal: undefined + content: 'Test message' }, - finish_reason: 'stop', - logprobs: undefined + finish_reason: 'stop' } ] } @@ -600,12 +592,9 @@ describe('stream-util', () => { index: 0, message: { role: 'assistant', - content: 'Test message', - tool_calls: undefined, - refusal: undefined + content: 'Test message' }, - finish_reason: 'stop', - logprobs: undefined + finish_reason: 'stop' } ] } @@ -641,11 +630,9 @@ describe('stream-util', () => { arguments: '{"param1":"value1"}' } } - ], - refusal: undefined + ] }, - finish_reason: 'tool_calls', - logprobs: undefined + finish_reason: 'tool_calls' } ] } @@ -687,11 +674,9 @@ describe('stream-util', () => { arguments: '{"param1":invalid_json}' } } - ], - refusal: undefined + ] }, - finish_reason: 'tool_calls', - logprobs: undefined + finish_reason: 'tool_calls' } ] } From 821a788b3241fad5f3365d1335b6a34e1ca797ae Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Wed, 16 Jul 2025 11:00:18 +0200 Subject: [PATCH 28/43] add stacktrace to log --- packages/orchestration/src/orchestration-stream-response.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index f932cc536..947b8657b 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -138,8 +138,10 @@ export class OrchestrationStreamResponse { private isStreamOpen(): boolean { if (this._openStream) { + const stacktrace = new Error().stack; logger.warn( - 'The stream is still open, the requested data is not available yet. Please wait until the stream is closed.' + `The stream is still open, the requested data is not available yet. Please wait until the stream is closed. + Stacktrace: ${stacktrace}` ); } return this._openStream; From 74edb01877716ff034a29cf17477a61900197763 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 17 Jul 2025 14:07:53 +0200 Subject: [PATCH 29/43] add try catch --- .../orchestration/src/orchestration-client.ts | 83 ++++++++++++------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.ts b/packages/orchestration/src/orchestration-client.ts index ae47293de..f857a078b 100644 --- a/packages/orchestration/src/orchestration-client.ts +++ b/packages/orchestration/src/orchestration-client.ts @@ -73,21 +73,31 @@ export class OrchestrationClient { options?: StreamOptions, requestConfig?: CustomRequestConfig ): Promise> { - if (typeof this.config === 'string' && options) { - logger.warn( - 'Stream options are not supported when using a JSON module config.' + try { + if (typeof this.config === 'string' && options) { + logger.warn( + 'Stream options are not supported when using a JSON module config.' + ); + } + + return this.createStreamResponse( + { + prompt, + requestConfig, + stream: true, + streamOptions: options + }, + controller ); - } + } catch (error) { + logger.error('Error while creating stream response:', error); - return this.createStreamResponse( - { - prompt, - requestConfig, - stream: true, - streamOptions: options - }, - controller - ); + if (!controller.signal.aborted) { + controller.abort(); + } + + throw error; + } } private async executeRequest(options: RequestOptions): Promise { @@ -131,25 +141,38 @@ export class OrchestrationClient { const response = new OrchestrationStreamResponse(); - const streamResponse = await this.executeRequest({ - ...options, - requestConfig: { - ...options.requestConfig, - responseType: 'stream', - signal: controller.signal - } - }); + try { + const streamResponse = await this.executeRequest({ + ...options, + requestConfig: { + ...options.requestConfig, + responseType: 'stream', + signal: controller.signal + } + }); + + const stream = OrchestrationStream._create(streamResponse, controller); + response.stream = stream + ._pipe(OrchestrationStream._processChunk) + ._pipe( + OrchestrationStream._processOrchestrationStreamChunkResponse, + response + ) + ._pipe(OrchestrationStream._processStreamEnd, response); + + return response; + } catch (error) { + logger.error( + 'Error while creating orchestration stream response:', + error + ); - const stream = OrchestrationStream._create(streamResponse, controller); - response.stream = stream - ._pipe(OrchestrationStream._processChunk) - ._pipe( - OrchestrationStream._processOrchestrationStreamChunkResponse, - response - ) - ._pipe(OrchestrationStream._processStreamEnd, response); + if (!controller.signal.aborted) { + controller.abort(); + } - return response; + throw error; + } } /** From 7a4e51acafea49397d009a114072524e7f88d0f4 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 17 Jul 2025 14:31:39 +0200 Subject: [PATCH 30/43] add error handling unit tests --- .../src/orchestration-client.test.ts | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/packages/orchestration/src/orchestration-client.test.ts b/packages/orchestration/src/orchestration-client.test.ts index 648e258a7..9d4eafc23 100644 --- a/packages/orchestration/src/orchestration-client.test.ts +++ b/packages/orchestration/src/orchestration-client.test.ts @@ -857,4 +857,70 @@ describe('orchestration service client', () => { ] `); }); + describe('OrchestrationClient Stream Error Handling', () => { + it('should abort controller and re-throw error when network request fails', async () => { + const config: OrchestrationModuleConfig = { + llm: { + model_name: 'gpt-4o', + model_params: {} + }, + templating: { + template: [ + { + role: 'user', + content: 'Test prompt' + } + ] + } + }; + + const controller = new AbortController(); + + // Mock network failure + mockInference( + { + data: constructCompletionPostRequest(config, undefined, true) + }, + { + status: 500, + data: 'Internal Server Error' + }, + { + url: 'inference/deployments/1234/completion' + } + ); + + const client = new OrchestrationClient(config); + + await expect(client.stream(undefined, controller)).rejects.toThrow(); + expect(controller.signal.aborted).toBe(true); + }); + + it('should handle aborted requests gracefully', async () => { + const config: OrchestrationModuleConfig = { + llm: { + model_name: 'gpt-4o', + model_params: {} + }, + templating: { + template: [ + { + role: 'user', + content: 'Test prompt' + } + ] + } + }; + + const controller = new AbortController(); + + // Abort immediately + controller.abort(); + + const client = new OrchestrationClient(config); + + await expect(client.stream(undefined, controller)).rejects.toThrow(); + expect(controller.signal.aborted).toBe(true); + }); + }); }); From 0b90a0a9cce697b5727d5db4c75be54c474dd56b Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Thu, 17 Jul 2025 14:35:22 +0200 Subject: [PATCH 31/43] trigger pipeline From 9978744fd31484e23c302786732ae91900179ec3 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 21 Jul 2025 14:04:05 +0200 Subject: [PATCH 32/43] adjust error handling --- .../orchestration/src/orchestration-client.ts | 16 ++++------------ .../src/orchestration-stream-response.ts | 12 ++---------- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.ts b/packages/orchestration/src/orchestration-client.ts index f857a078b..30315786b 100644 --- a/packages/orchestration/src/orchestration-client.ts +++ b/packages/orchestration/src/orchestration-client.ts @@ -90,12 +90,8 @@ export class OrchestrationClient { controller ); } catch (error) { - logger.error('Error while creating stream response:', error); - - if (!controller.signal.aborted) { - controller.abort(); - } - + logger.error('Error while creating the stream response:', error); + controller.abort(); throw error; } } @@ -163,14 +159,10 @@ export class OrchestrationClient { return response; } catch (error) { logger.error( - 'Error while creating orchestration stream response:', + 'Error while creating and processing the orchestration stream response:', error ); - - if (!controller.signal.aborted) { - controller.abort(); - } - + controller.abort(); throw error; } } diff --git a/packages/orchestration/src/orchestration-stream-response.ts b/packages/orchestration/src/orchestration-stream-response.ts index 947b8657b..1f3612ded 100644 --- a/packages/orchestration/src/orchestration-stream-response.ts +++ b/packages/orchestration/src/orchestration-stream-response.ts @@ -1,4 +1,3 @@ -import { createLogger } from '@sap-cloud-sdk/util'; import type { AssistantChatMessage, ChatMessage, @@ -9,11 +8,6 @@ import type { } from './client/api/schema/index.js'; import type { OrchestrationStream } from './orchestration-stream.js'; -const logger = createLogger({ - package: 'orchestration', - messageContext: 'orchestration-stream-response' -}); - /** * Orchestration stream response. */ @@ -138,10 +132,8 @@ export class OrchestrationStreamResponse { private isStreamOpen(): boolean { if (this._openStream) { - const stacktrace = new Error().stack; - logger.warn( - `The stream is still open, the requested data is not available yet. Please wait until the stream is closed. - Stacktrace: ${stacktrace}` + throw Error( + 'The stream is still open, the requested data is not available yet. Please wait until the stream is closed.' ); } return this._openStream; From 0948e528a17a3a3bbbab83533d4b56e71ada50d9 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 21 Jul 2025 15:13:56 +0200 Subject: [PATCH 33/43] add tracing information to validator --- .../orchestration/src/util/stream.test.ts | 4 +- packages/orchestration/src/util/stream.ts | 78 +++++++++++++------ 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/packages/orchestration/src/util/stream.test.ts b/packages/orchestration/src/util/stream.test.ts index 22c3aaf48..0f8d69913 100644 --- a/packages/orchestration/src/util/stream.test.ts +++ b/packages/orchestration/src/util/stream.test.ts @@ -687,9 +687,7 @@ describe('stream-util', () => { validateResponse(response); expect(warnSpy).toHaveBeenCalledWith( - expect.stringContaining( - 'ToolCall arguments are not valid JSON for tool: test_function' - ) + 'orchestration: LlmChoice 0: ToolCall arguments are not valid JSON for tool: test_function' ); }); }); diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 6ed74577b..37ea1c8ac 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -272,80 +272,110 @@ export function validateResponse( ); } - validateLlmModuleResult(response._data.module_results?.llm); + validateLlmModuleResult(response._data.module_results?.llm, 'llm'); - validateLlmModuleResult(response._data.orchestration_result); + validateLlmModuleResult(response._data.orchestration_result, 'orchestration'); - validateChoices(response._data.module_results?.output_unmasking); + validateChoices( + response._data.module_results?.output_unmasking, + 'output_unmasking' + ); } function validateLlmModuleResult( - llmModuleResult: Partial | undefined + llmModuleResult: Partial | undefined, + sourceModule: string ): void { if (llmModuleResult) { if (!llmModuleResult.usage) { - logger.warn('LlmModuleResult is missing usage information.'); + logger.warn( + `${sourceModule}: LlmModuleResult is missing usage information.` + ); } if (!llmModuleResult.choices || llmModuleResult.choices.length === 0) { - logger.warn('LlmModuleResult must contain at least one choice.'); + logger.warn( + `${sourceModule}: LlmModuleResult must contain at least one choice.` + ); } - validateChoices(llmModuleResult.choices); + validateChoices(llmModuleResult.choices, sourceModule); } } -function validateChoices(choices: Partial[] | undefined): void { +function validateChoices( + choices: Partial[] | undefined, + sourceModule: string +): void { if (choices) { for (const choice of choices) { if (!choice.message) { - logger.warn('LlmChoice is missing message information.'); + logger.warn( + `${sourceModule}: LlmChoice ${choice.index} is missing a message.` + ); } else { - validateMessage(choice.message); + validateMessage(choice.message, sourceModule, choice.index); } if (!choice.finish_reason) { - logger.warn('LlmChoice is missing a finish reason.'); + logger.warn( + `${sourceModule}: LlmChoice ${choice.index} is missing a finish reason.` + ); } if (!choice.index && choice.index !== 0) { - logger.warn('LlmChoice must have a valid index.'); + logger.warn(`${sourceModule}: LlmChoice must have a valid index.`); } } } } -function validateMessage(message: Partial): void { +function validateMessage( + message: Partial, + sourceModule: string, + sourceChoice: number | undefined +): void { if (!message.role) { - logger.warn('Message is missing role information.'); + logger.warn( + `${sourceModule}: LlmChoice ${sourceChoice}: message is missing role.` + ); } if (!message.content && !message.tool_calls) { - logger.warn('Message contains neither content nor tool calls.'); + logger.warn( + `${sourceModule}: LlmChoice ${sourceChoice}: message contains neither content nor tool calls.` + ); } if (message.tool_calls) { for (const toolCall of message.tool_calls) { - validateToolCall(toolCall); + validateToolCall(toolCall, sourceModule, sourceChoice); } } } -function validateToolCall(toolCall: Partial): void { +function validateToolCall( + toolCall: Partial, + sourceModule: string, + sourceChoice: number | undefined +): void { if (typeof toolCall.id !== 'string') { - logger.warn('ToolCall is missing id information.'); + logger.warn( + `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall is missing id.` + ); } if (typeof toolCall.function?.name !== 'string') { - logger.warn('ToolCall is missing function name information.'); + logger.warn( + `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall is missing function name.` + ); } if (typeof toolCall.function?.arguments !== 'string') { - logger.warn('ToolCall is missing function arguments information.'); + logger.warn( + `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall is missing function arguments.` + ); } try { JSON.parse(toolCall.function?.arguments ?? ''); } catch { logger.warn( - 'ToolCall arguments are not valid JSON for tool: ' + - toolCall.function?.name || - toolCall.id || - 'unknown' + `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall arguments are not valid JSON for tool: ${toolCall.function?.name || toolCall.id || 'unknown'}` ); } } From abd47c01ca525e8f196b68ce28b85ed741f5c1d5 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Mon, 21 Jul 2025 16:14:41 +0200 Subject: [PATCH 34/43] remove outer try catch --- .../src/orchestration-client.test.ts | 12 ++++- .../orchestration/src/orchestration-client.ts | 49 ++++++++----------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.test.ts b/packages/orchestration/src/orchestration-client.test.ts index 9d4eafc23..e489ba9ba 100644 --- a/packages/orchestration/src/orchestration-client.test.ts +++ b/packages/orchestration/src/orchestration-client.test.ts @@ -859,6 +859,12 @@ describe('orchestration service client', () => { }); describe('OrchestrationClient Stream Error Handling', () => { it('should abort controller and re-throw error when network request fails', async () => { + const logger = createLogger({ + package: 'orchestration', + messageContext: 'orchestration-client' + }); + + const errorSpy = jest.spyOn(logger, 'error'); const config: OrchestrationModuleConfig = { llm: { model_name: 'gpt-4o', @@ -883,7 +889,7 @@ describe('orchestration service client', () => { }, { status: 500, - data: 'Internal Server Error' + data: { error: 'Internal Server Error' } }, { url: 'inference/deployments/1234/completion' @@ -894,6 +900,10 @@ describe('orchestration service client', () => { await expect(client.stream(undefined, controller)).rejects.toThrow(); expect(controller.signal.aborted).toBe(true); + expect(errorSpy).toHaveBeenCalledWith( + 'Error while creating the stream response:', + expect.any(Error) + ); }); it('should handle aborted requests gracefully', async () => { diff --git a/packages/orchestration/src/orchestration-client.ts b/packages/orchestration/src/orchestration-client.ts index 30315786b..2ded257ff 100644 --- a/packages/orchestration/src/orchestration-client.ts +++ b/packages/orchestration/src/orchestration-client.ts @@ -80,7 +80,7 @@ export class OrchestrationClient { ); } - return this.createStreamResponse( + return await this.createStreamResponse( { prompt, requestConfig, @@ -137,34 +137,25 @@ export class OrchestrationClient { const response = new OrchestrationStreamResponse(); - try { - const streamResponse = await this.executeRequest({ - ...options, - requestConfig: { - ...options.requestConfig, - responseType: 'stream', - signal: controller.signal - } - }); - - const stream = OrchestrationStream._create(streamResponse, controller); - response.stream = stream - ._pipe(OrchestrationStream._processChunk) - ._pipe( - OrchestrationStream._processOrchestrationStreamChunkResponse, - response - ) - ._pipe(OrchestrationStream._processStreamEnd, response); - - return response; - } catch (error) { - logger.error( - 'Error while creating and processing the orchestration stream response:', - error - ); - controller.abort(); - throw error; - } + const streamResponse = await this.executeRequest({ + ...options, + requestConfig: { + ...options.requestConfig, + responseType: 'stream', + signal: controller.signal + } + }); + + const stream = OrchestrationStream._create(streamResponse, controller); + response.stream = stream + ._pipe(OrchestrationStream._processChunk) + ._pipe( + OrchestrationStream._processOrchestrationStreamChunkResponse, + response + ) + ._pipe(OrchestrationStream._processStreamEnd, response); + + return response; } /** From ceb5bbcef1c2a4dfedccdb886489f5d55ab857ad Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Thu, 24 Jul 2025 16:10:20 +0200 Subject: [PATCH 35/43] chore: address review comment --- packages/orchestration/src/orchestration-client.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/orchestration/src/orchestration-client.test.ts b/packages/orchestration/src/orchestration-client.test.ts index e489ba9ba..d50e970fa 100644 --- a/packages/orchestration/src/orchestration-client.test.ts +++ b/packages/orchestration/src/orchestration-client.test.ts @@ -930,7 +930,6 @@ describe('orchestration service client', () => { const client = new OrchestrationClient(config); await expect(client.stream(undefined, controller)).rejects.toThrow(); - expect(controller.signal.aborted).toBe(true); }); }); }); From c5d2f77c29578914352f9d8293bf9079de7ad0e4 Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Thu, 24 Jul 2025 16:18:43 +0200 Subject: [PATCH 36/43] chore: address more review comments --- packages/orchestration/src/orchestration-client.test.ts | 2 +- packages/orchestration/src/orchestration-client.ts | 1 - packages/orchestration/src/util/stream.ts | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.test.ts b/packages/orchestration/src/orchestration-client.test.ts index d50e970fa..372958b9b 100644 --- a/packages/orchestration/src/orchestration-client.test.ts +++ b/packages/orchestration/src/orchestration-client.test.ts @@ -906,7 +906,7 @@ describe('orchestration service client', () => { ); }); - it('should handle aborted requests gracefully', async () => { + it('should throw error when stream is called with already aborted controller', async () => { const config: OrchestrationModuleConfig = { llm: { model_name: 'gpt-4o', diff --git a/packages/orchestration/src/orchestration-client.ts b/packages/orchestration/src/orchestration-client.ts index 2ded257ff..a1662ae9f 100644 --- a/packages/orchestration/src/orchestration-client.ts +++ b/packages/orchestration/src/orchestration-client.ts @@ -90,7 +90,6 @@ export class OrchestrationClient { controller ); } catch (error) { - logger.error('Error while creating the stream response:', error); controller.abort(); throw error; } diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 37ea1c8ac..4003cd151 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -194,7 +194,7 @@ function mergeLogProbs( function handleFinishReason( existing: string | undefined, incoming: string | undefined, - choiceIndex: number | undefined + choiceIndex: number ): string { if (!incoming) { return existing ?? ''; From 8c64a8acc3f740d443c66859539410e0d6bdda7a Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Thu, 24 Jul 2025 16:32:23 +0200 Subject: [PATCH 37/43] chore: remove line --- packages/orchestration/src/util/stream.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 4003cd151..ea2e6d85e 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -266,12 +266,6 @@ function transformStreamingToolCall(toolCall: ToolCallChunk): MessageToolCall { export function validateResponse( response: OrchestrationStreamResponse ): void { - if (response._openStream) { - throw new Error( - "Stream wasn't closed properly. Please ensure the stream is closed after processing." - ); - } - validateLlmModuleResult(response._data.module_results?.llm, 'llm'); validateLlmModuleResult(response._data.orchestration_result, 'orchestration'); From cfea9ef833d6d97fe21bba7a98afdd5a59d9ef3e Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Thu, 24 Jul 2025 16:39:29 +0200 Subject: [PATCH 38/43] chore: remove redundant check --- packages/orchestration/src/util/stream.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index ea2e6d85e..484b5dd66 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -314,9 +314,6 @@ function validateChoices( `${sourceModule}: LlmChoice ${choice.index} is missing a finish reason.` ); } - if (!choice.index && choice.index !== 0) { - logger.warn(`${sourceModule}: LlmChoice must have a valid index.`); - } } } } From caf23c0a6c84dee5c4615268ac30d5e412c0a9db Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Thu, 24 Jul 2025 17:08:39 +0200 Subject: [PATCH 39/43] chore: add changeset --- .changeset/when-lizards-fly.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/when-lizards-fly.md diff --git a/.changeset/when-lizards-fly.md b/.changeset/when-lizards-fly.md new file mode 100644 index 000000000..e467f3232 --- /dev/null +++ b/.changeset/when-lizards-fly.md @@ -0,0 +1,5 @@ +--- +'@sap-ai-sdk/orchestration': minor +--- + +[Improvement] Add utility functions `getContent()`, `getRefusal()`, `getAllMessages()`, `getAssistantMessage()`, and `getResponse()` to stream response. \ No newline at end of file From c9acd39567b25135f7f0a5690ad304e5c9cd8340 Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Thu, 24 Jul 2025 17:15:59 +0200 Subject: [PATCH 40/43] chore: remove test --- packages/orchestration/src/util/stream.test.ts | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/packages/orchestration/src/util/stream.test.ts b/packages/orchestration/src/util/stream.test.ts index 0f8d69913..31e15464f 100644 --- a/packages/orchestration/src/util/stream.test.ts +++ b/packages/orchestration/src/util/stream.test.ts @@ -545,20 +545,6 @@ describe('stream-util', () => { }); describe('validateResponse', () => { - it('throws error when stream is still open', () => { - const response = - new OrchestrationStreamResponse(); - response._data = { - request_id: 'test-request-123', - module_results: {}, - orchestration_result: undefined - }; - - expect(() => validateResponse(response)).toThrow( - "Stream wasn't closed properly. Please ensure the stream is closed after processing." - ); - }); - it('validates successfully with proper response structure', () => { const response = new OrchestrationStreamResponse(); From 09ee480d7e29c43a322bbd99c2b0760abf58d9b4 Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Fri, 25 Jul 2025 09:47:11 +0200 Subject: [PATCH 41/43] chore: fix failing test --- .../orchestration/src/orchestration-client.test.ts | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/packages/orchestration/src/orchestration-client.test.ts b/packages/orchestration/src/orchestration-client.test.ts index 372958b9b..dbfef3311 100644 --- a/packages/orchestration/src/orchestration-client.test.ts +++ b/packages/orchestration/src/orchestration-client.test.ts @@ -859,12 +859,6 @@ describe('orchestration service client', () => { }); describe('OrchestrationClient Stream Error Handling', () => { it('should abort controller and re-throw error when network request fails', async () => { - const logger = createLogger({ - package: 'orchestration', - messageContext: 'orchestration-client' - }); - - const errorSpy = jest.spyOn(logger, 'error'); const config: OrchestrationModuleConfig = { llm: { model_name: 'gpt-4o', @@ -899,11 +893,6 @@ describe('orchestration service client', () => { const client = new OrchestrationClient(config); await expect(client.stream(undefined, controller)).rejects.toThrow(); - expect(controller.signal.aborted).toBe(true); - expect(errorSpy).toHaveBeenCalledWith( - 'Error while creating the stream response:', - expect.any(Error) - ); }); it('should throw error when stream is called with already aborted controller', async () => { From a5c02fa28123682ed924d206437d01a7f0c4928b Mon Sep 17 00:00:00 2001 From: KavithaSiva Date: Fri, 25 Jul 2025 10:23:43 +0200 Subject: [PATCH 42/43] chore: remove some mandatory validations --- packages/orchestration/src/util/stream.ts | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 484b5dd66..0b7ae0a2d 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -302,11 +302,7 @@ function validateChoices( ): void { if (choices) { for (const choice of choices) { - if (!choice.message) { - logger.warn( - `${sourceModule}: LlmChoice ${choice.index} is missing a message.` - ); - } else { + if (choice.message) { validateMessage(choice.message, sourceModule, choice.index); } if (!choice.finish_reason) { @@ -323,11 +319,6 @@ function validateMessage( sourceModule: string, sourceChoice: number | undefined ): void { - if (!message.role) { - logger.warn( - `${sourceModule}: LlmChoice ${sourceChoice}: message is missing role.` - ); - } if (!message.content && !message.tool_calls) { logger.warn( `${sourceModule}: LlmChoice ${sourceChoice}: message contains neither content nor tool calls.` From ede796c074353691f6b97934fe297cbd16b5c637 Mon Sep 17 00:00:00 2001 From: tomfrenken Date: Fri, 25 Jul 2025 13:25:43 +0200 Subject: [PATCH 43/43] remove validation & testsg --- .../orchestration/src/orchestration-stream.ts | 3 +- .../orchestration/src/util/stream.test.ts | 138 +----------------- packages/orchestration/src/util/stream.ts | 102 ------------- 3 files changed, 2 insertions(+), 241 deletions(-) diff --git a/packages/orchestration/src/orchestration-stream.ts b/packages/orchestration/src/orchestration-stream.ts index ebcd38adb..4c138018e 100644 --- a/packages/orchestration/src/orchestration-stream.ts +++ b/packages/orchestration/src/orchestration-stream.ts @@ -1,6 +1,6 @@ import { SseStream } from '@sap-ai-sdk/core'; import { OrchestrationStreamChunkResponse } from './orchestration-stream-chunk-response.js'; -import { mergeStreamResponse, validateResponse } from './util/index.js'; +import { mergeStreamResponse } from './util/index.js'; import type { CompletionPostResponseStreaming } from './client/api/schema/index.js'; import type { HttpResponse } from '@sap-cloud-sdk/http-client'; import type { OrchestrationStreamResponse } from './orchestration-stream-response.js'; @@ -67,7 +67,6 @@ export class OrchestrationStream extends SseStream { } response._openStream = false; - validateResponse(response); } /** diff --git a/packages/orchestration/src/util/stream.test.ts b/packages/orchestration/src/util/stream.test.ts index 31e15464f..555b4824a 100644 --- a/packages/orchestration/src/util/stream.test.ts +++ b/packages/orchestration/src/util/stream.test.ts @@ -1,7 +1,5 @@ -import { createLogger } from '@sap-cloud-sdk/util'; -import { jest } from '@jest/globals'; import { OrchestrationStreamResponse } from '../index.js'; -import { mergeStreamResponse, validateResponse } from './stream.js'; +import { mergeStreamResponse } from './stream.js'; import type { CompletionPostResponseStreaming, OrchestrationStreamChunkResponse @@ -543,138 +541,4 @@ describe('stream-util', () => { ).toBe(' message'); }); }); - - describe('validateResponse', () => { - it('validates successfully with proper response structure', () => { - const response = - new OrchestrationStreamResponse(); - response._data = { - request_id: 'test-request-123', - module_results: { - llm: { - ...llmBase, - usage: { - prompt_tokens: 10, - completion_tokens: 5, - total_tokens: 15 - }, - choices: [ - { - index: 0, - message: { - role: 'assistant', - content: 'Test message' - }, - finish_reason: 'stop' - } - ] - } - }, - orchestration_result: { - ...llmBase, - usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }, - choices: [ - { - index: 0, - message: { - role: 'assistant', - content: 'Test message' - }, - finish_reason: 'stop' - } - ] - } - }; - - response._openStream = false; - - expect(() => validateResponse(response)).not.toThrow(); - }); - - it('validates tool calls with proper JSON arguments', () => { - const response = - new OrchestrationStreamResponse(); - response._data = { - request_id: 'test-request-123', - module_results: {}, - orchestration_result: { - ...llmBase, - usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }, - choices: [ - { - index: 0, - message: { - role: 'assistant', - content: '', - tool_calls: [ - { - index: 0, - id: 'tool-call-1', - type: 'function', - function: { - name: 'test_function', - arguments: '{"param1":"value1"}' - } - } - ] - }, - finish_reason: 'tool_calls' - } - ] - } - }; - - response._openStream = false; - - expect(() => validateResponse(response)).not.toThrow(); - }); - - it('logs warning for invalid tool call arguments', () => { - const logger = createLogger({ - package: 'orchestration', - messageContext: 'stream-util' - }); - const warnSpy = jest.spyOn(logger, 'warn'); - - const response = - new OrchestrationStreamResponse(); - response._data = { - request_id: 'test-request-123', - module_results: {}, - orchestration_result: { - ...llmBase, - usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }, - choices: [ - { - index: 0, - message: { - role: 'assistant', - content: '', - tool_calls: [ - { - index: 0, - id: 'tool-call-1', - type: 'function', - function: { - name: 'test_function', - arguments: '{"param1":invalid_json}' - } - } - ] - }, - finish_reason: 'tool_calls' - } - ] - } - }; - - response._openStream = false; - - validateResponse(response); - - expect(warnSpy).toHaveBeenCalledWith( - 'orchestration: LlmChoice 0: ToolCall arguments are not valid JSON for tool: test_function' - ); - }); - }); }); diff --git a/packages/orchestration/src/util/stream.ts b/packages/orchestration/src/util/stream.ts index 0b7ae0a2d..a030193f2 100644 --- a/packages/orchestration/src/util/stream.ts +++ b/packages/orchestration/src/util/stream.ts @@ -259,105 +259,3 @@ function transformStreamingToolCall(toolCall: ToolCallChunk): MessageToolCall { } }; } - -/** - * @internal - */ -export function validateResponse( - response: OrchestrationStreamResponse -): void { - validateLlmModuleResult(response._data.module_results?.llm, 'llm'); - - validateLlmModuleResult(response._data.orchestration_result, 'orchestration'); - - validateChoices( - response._data.module_results?.output_unmasking, - 'output_unmasking' - ); -} - -function validateLlmModuleResult( - llmModuleResult: Partial | undefined, - sourceModule: string -): void { - if (llmModuleResult) { - if (!llmModuleResult.usage) { - logger.warn( - `${sourceModule}: LlmModuleResult is missing usage information.` - ); - } - if (!llmModuleResult.choices || llmModuleResult.choices.length === 0) { - logger.warn( - `${sourceModule}: LlmModuleResult must contain at least one choice.` - ); - } - - validateChoices(llmModuleResult.choices, sourceModule); - } -} - -function validateChoices( - choices: Partial[] | undefined, - sourceModule: string -): void { - if (choices) { - for (const choice of choices) { - if (choice.message) { - validateMessage(choice.message, sourceModule, choice.index); - } - if (!choice.finish_reason) { - logger.warn( - `${sourceModule}: LlmChoice ${choice.index} is missing a finish reason.` - ); - } - } - } -} - -function validateMessage( - message: Partial, - sourceModule: string, - sourceChoice: number | undefined -): void { - if (!message.content && !message.tool_calls) { - logger.warn( - `${sourceModule}: LlmChoice ${sourceChoice}: message contains neither content nor tool calls.` - ); - } - - if (message.tool_calls) { - for (const toolCall of message.tool_calls) { - validateToolCall(toolCall, sourceModule, sourceChoice); - } - } -} - -function validateToolCall( - toolCall: Partial, - sourceModule: string, - sourceChoice: number | undefined -): void { - if (typeof toolCall.id !== 'string') { - logger.warn( - `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall is missing id.` - ); - } - if (typeof toolCall.function?.name !== 'string') { - logger.warn( - `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall is missing function name.` - ); - } - if (typeof toolCall.function?.arguments !== 'string') { - logger.warn( - `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall is missing function arguments.` - ); - } - - try { - JSON.parse(toolCall.function?.arguments ?? ''); - } catch { - logger.warn( - `${sourceModule}: LlmChoice ${sourceChoice}: ToolCall arguments are not valid JSON for tool: ${toolCall.function?.name || toolCall.id || 'unknown'}` - ); - } -}