Skip to content

feat: Add utility functionality to get the entire stream response #852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
712953c
save intermediate state
tomfrenken Jul 7, 2025
b99552c
merge with main
tomfrenken Jul 7, 2025
784d2bd
checkpoint
tomfrenken Jul 8, 2025
b2da91b
create utility function
tomfrenken Jul 8, 2025
a942d54
checkpoint
tomfrenken Jul 8, 2025
66a1c30
replicate utility
tomfrenken Jul 9, 2025
11da539
start utility functions
tomfrenken Jul 9, 2025
831e861
refactor
tomfrenken Jul 9, 2025
8092d5f
save
tomfrenken Jul 10, 2025
1c276fe
checkpoint
tomfrenken Jul 10, 2025
cf30502
lint
tomfrenken Jul 10, 2025
e88e267
lint
tomfrenken Jul 10, 2025
8131fb0
create baseline
tomfrenken Jul 10, 2025
e2193e8
progress
tomfrenken Jul 10, 2025
670ae48
lint
tomfrenken Jul 10, 2025
4b9b325
draft
tomfrenken Jul 10, 2025
614c100
semi-final
tomfrenken Jul 10, 2025
e4372fc
merge function complete
tomfrenken Jul 10, 2025
e3fe4c1
add finish reason handler
tomfrenken Jul 10, 2025
381fd5b
clean-up
tomfrenken Jul 10, 2025
ad5678b
adjust validation types
tomfrenken Jul 10, 2025
c483564
lint
tomfrenken Jul 14, 2025
7c0dd77
add index
tomfrenken Jul 14, 2025
b8a23e4
update test
tomfrenken Jul 14, 2025
137382b
fix: Changes from lint
Jul 14, 2025
7d80530
remove redundant assignment
tomfrenken Jul 14, 2025
c535dd6
Merge branch 'process-module-results' of https://github.com/SAP/ai-sd…
tomfrenken Jul 14, 2025
324cb24
add unit tests
tomfrenken Jul 15, 2025
4151545
review
tomfrenken Jul 16, 2025
821a788
add stacktrace to log
tomfrenken Jul 16, 2025
74edb01
add try catch
tomfrenken Jul 17, 2025
7a4e51a
add error handling unit tests
tomfrenken Jul 17, 2025
3885a39
Merge branch 'main' of https://github.com/SAP/ai-sdk-js into process-…
tomfrenken Jul 17, 2025
0b90a0a
trigger pipeline
tomfrenken Jul 17, 2025
9978744
adjust error handling
tomfrenken Jul 21, 2025
0948e52
add tracing information to validator
tomfrenken Jul 21, 2025
abd47c0
remove outer try catch
tomfrenken Jul 21, 2025
e3fec88
Merge branch 'main' into process-module-results
KavithaSiva Jul 24, 2025
ceb5bbc
chore: address review comment
KavithaSiva Jul 24, 2025
c5d2f77
chore: address more review comments
KavithaSiva Jul 24, 2025
8c64a8a
chore: remove line
KavithaSiva Jul 24, 2025
cfea9ef
chore: remove redundant check
KavithaSiva Jul 24, 2025
caf23c0
chore: add changeset
KavithaSiva Jul 24, 2025
c9acd39
chore: remove test
KavithaSiva Jul 24, 2025
09ee480
chore: fix failing test
KavithaSiva Jul 25, 2025
a5c02fa
chore: remove some mandatory validations
KavithaSiva Jul 25, 2025
6b705ae
Merge branch 'main' into process-module-results
KavithaSiva Jul 25, 2025
ede796c
remove validation & testsg
tomfrenken Jul 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions packages/orchestration/src/orchestration-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ describe('orchestration service client', () => {
"name": "add",
},
"id": "call_HPgxxSmD2ctYfcJ3gp1JBc7i",
"index": 0,
"type": "function",
},
{
Expand All @@ -850,9 +851,76 @@ describe('orchestration service client', () => {
"name": "multiply",
},
"id": "call_PExve0Dd9hxD8hOk4Uhr1yhO",
"index": 1,
"type": "function",
},
]
`);
});
describe('OrchestrationClient Stream Error Handling', () => {
it('should abort controller and re-throw error when network request fails', async () => {
const config: OrchestrationModuleConfig = {
llm: {
model_name: 'gpt-4o',
model_params: {}
},
templating: {
template: [
{
role: 'user',
content: 'Test prompt'
}
]
}
};

const controller = new AbortController();

// Mock network failure
mockInference(
{
data: constructCompletionPostRequest(config, undefined, true)
},
{
status: 500,
data: 'Internal Server Error'
},
{
url: 'inference/deployments/1234/completion'
}
);

const client = new OrchestrationClient(config);

await expect(client.stream(undefined, controller)).rejects.toThrow();
expect(controller.signal.aborted).toBe(true);
});

it('should handle aborted requests gracefully', async () => {
const config: OrchestrationModuleConfig = {
llm: {
model_name: 'gpt-4o',
model_params: {}
},
templating: {
template: [
{
role: 'user',
content: 'Test prompt'
}
]
}
};

const controller = new AbortController();

// Abort immediately
controller.abort();

const client = new OrchestrationClient(config);

await expect(client.stream(undefined, controller)).rejects.toThrow();
expect(controller.signal.aborted).toBe(true);
});
});
});
81 changes: 53 additions & 28 deletions packages/orchestration/src/orchestration-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,31 @@ export class OrchestrationClient {
options?: StreamOptions,
requestConfig?: CustomRequestConfig
): Promise<OrchestrationStreamResponse<OrchestrationStreamChunkResponse>> {
if (typeof this.config === 'string' && options) {
logger.warn(
'Stream options are not supported when using a JSON module config.'
try {
if (typeof this.config === 'string' && options) {
logger.warn(
'Stream options are not supported when using a JSON module config.'
);
}

return this.createStreamResponse(
{
prompt,
requestConfig,
stream: true,
streamOptions: options
},
controller
);
}
} catch (error) {
logger.error('Error while creating stream response:', error);

return this.createStreamResponse(
{
prompt,
requestConfig,
stream: true,
streamOptions: options
},
controller
);
if (!controller.signal.aborted) {
controller.abort();
}

throw error;
}
}

private async executeRequest(options: RequestOptions): Promise<HttpResponse> {
Expand Down Expand Up @@ -131,23 +141,38 @@ export class OrchestrationClient {
const response =
new OrchestrationStreamResponse<OrchestrationStreamChunkResponse>();

const streamResponse = await this.executeRequest({
...options,
requestConfig: {
...options.requestConfig,
responseType: 'stream',
signal: controller.signal
}
});
try {
const streamResponse = await this.executeRequest({
...options,
requestConfig: {
...options.requestConfig,
responseType: 'stream',
signal: controller.signal
}
});

const stream = OrchestrationStream._create(streamResponse, controller);
response.stream = stream
._pipe(OrchestrationStream._processChunk)
._pipe(
OrchestrationStream._processOrchestrationStreamChunkResponse,
response
)
._pipe(OrchestrationStream._processStreamEnd, response);

return response;
} catch (error) {
logger.error(
'Error while creating orchestration stream response:',
error
);

const stream = OrchestrationStream._create(streamResponse, controller);
response.stream = stream
._pipe(OrchestrationStream._processChunk)
._pipe(OrchestrationStream._processToolCalls, response)
._pipe(OrchestrationStream._processFinishReason, response)
._pipe(OrchestrationStream._processTokenUsage, response);
if (!controller.signal.aborted) {
controller.abort();
}

return response;
throw error;
}
}

/**
Expand Down
132 changes: 95 additions & 37 deletions packages/orchestration/src/orchestration-stream-response.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
import type { ToolCallAccumulator } from './util/index.js';
import { createLogger } from '@sap-cloud-sdk/util';
import type {
AssistantChatMessage,
ChatMessage,
ChatMessages,
CompletionPostResponse,
MessageToolCalls,
TokenUsage
} from './client/api/schema/index.js';
import type { OrchestrationStream } from './orchestration-stream.js';

const logger = createLogger({
package: 'orchestration',
messageContext: 'orchestration-stream-response'
});

/**
* Orchestration stream response.
*/
export class OrchestrationStreamResponse<T> {
private _usage: TokenUsage | undefined;
/**
* Finish reasons for all choices.
*/
private _finishReasons: Map<number, string> = new Map();
private _toolCallsAccumulators: Map<
number,
Map<number, ToolCallAccumulator>
> = new Map();
public _openStream = true;
public _data: Partial<CompletionPostResponse> = {};
private _stream: OrchestrationStream<T> | undefined;
private _toolCalls: Map<number, MessageToolCalls> = new Map();

/**
* Gets the token usage for the response.
* @returns The token usage for the response.
*/
public getTokenUsage(): TokenUsage | undefined {
return this._usage;
}

/**
* @internal
*/
_setTokenUsage(usage: TokenUsage): void {
this._usage = usage;
if (this.isStreamOpen()) {
return;
}
return this._data.orchestration_result?.usage;
}

/**
Expand All @@ -42,44 +39,86 @@ export class OrchestrationStreamResponse<T> {
* @returns The finish reason for the specified choice index.
*/
public getFinishReason(choiceIndex = 0): string | undefined {
return this._finishReasons.get(choiceIndex);
if (this.isStreamOpen()) {
return;
}
return this.findChoiceByIndex(choiceIndex)?.finish_reason;
}

/**
* @internal
* Parses the orchestration response and returns the content.
* If the response was filtered, an error is thrown.
* @param choiceIndex - The index of the choice to parse.
* @returns The message content.
*/
_getFinishReasons(): Map<number, string> {
return this._finishReasons;
public getContent(choiceIndex = 0): string | undefined {
if (this.isStreamOpen()) {
return;
}
const choice = this.findChoiceByIndex(choiceIndex);
return choice?.message?.content;
}

/**
* @internal
* Parses the orchestration response and returns the tool calls generated by the model.
* @param choiceIndex - The index of the choice to parse.
* @returns The message tool calls.
*/
_setFinishReasons(finishReasons: Map<number, string>): void {
this._finishReasons = finishReasons;
public getToolCalls(choiceIndex = 0): MessageToolCalls | undefined {
if (this.isStreamOpen()) {
return;
}
const choice = this.findChoiceByIndex(choiceIndex);
return choice?.message?.tool_calls;
}

/**
* Gets the tool calls for a specific choice index.
* @param choiceIndex - The index of the choice to get the tool calls for.
* @returns The tool calls for the specified choice index.
* Parses the orchestration response and returns the refusal message generated by the model.
* @param choiceIndex - The index of the choice to parse.
* @returns The refusal string.
*/
public getToolCalls(choiceIndex = 0): MessageToolCalls | undefined {
return this._toolCalls.get(choiceIndex);
public getRefusal(choiceIndex = 0): string | undefined {
if (this.isStreamOpen()) {
return;
}
const choice = this.findChoiceByIndex(choiceIndex);
return choice?.message?.refusal;
}

/**
* @internal
* Messages that can be used for subsequent prompts as message history.
* @param choiceIndex - The index of the choice to parse.
* @returns A list of all messages.
*/
_setToolCalls(choiceIndex: number, toolCalls: MessageToolCalls): void {
this._toolCalls.set(choiceIndex, toolCalls);
public getAllMessages(choiceIndex = 0): ChatMessages | undefined {
if (this.isStreamOpen()) {
return;
}
const messages: ChatMessage[] = this._data.module_results?.templating ?? [];
const content = this.findChoiceByIndex(choiceIndex)?.message;
return content ? [...messages, content] : messages;
}

/**
* @internal
* Gets the assistant message from the response.
* @param choiceIndex - The index of the choice to use (default is 0).
* @returns The assistant message.
*/
_getToolCallsAccumulators(): Map<number, Map<number, ToolCallAccumulator>> {
return this._toolCallsAccumulators;

public getAssistantMessage(
choiceIndex = 0
): AssistantChatMessage | undefined {
if (this.isStreamOpen()) {
return;
}
return this.findChoiceByIndex(choiceIndex)?.message;
}

public getResponse(): CompletionPostResponse | undefined {
if (this.isStreamOpen()) {
return;
}
return this._data as CompletionPostResponse;
}

get stream(): OrchestrationStream<T> {
Expand All @@ -89,6 +128,25 @@ export class OrchestrationStreamResponse<T> {
return this._stream;
}

private getChoices() {
return this._data.orchestration_result?.choices ?? [];
}

private findChoiceByIndex(index: number) {
return this.getChoices().find((c: { index: number }) => c.index === index);
}

private isStreamOpen(): boolean {
if (this._openStream) {
const stacktrace = new Error().stack;
logger.warn(
`The stream is still open, the requested data is not available yet. Please wait until the stream is closed.
Stacktrace: ${stacktrace}`
);
}
return this._openStream;
}

/**
* @internal
*/
Expand Down
Loading
Loading