Skip to content

Commit 52a81de

Browse files
committed
feat: Implement streaming message integration in conversation loader
- Added functionality to load additional messages from streaming files based on session ID. - Merged non-duplicate streaming messages with existing conversation history. - Updated conversation metadata, including message count and end time. - Introduced a new utility function `getStreamingMessages` to retrieve messages from streaming files. This enhancement improves the conversation loading process by ensuring that the latest messages are included, providing a more comprehensive chat history.
1 parent 4c7d3b6 commit 52a81de

File tree

2 files changed

+147
-1
lines changed

2 files changed

+147
-1
lines changed

backend/history/conversationLoader.ts

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type { ConversationHistory } from "../../shared/types.ts";
88
import type { Runtime } from "../runtime/types.ts";
99
import { processConversationMessages } from "./timestampRestore.ts";
1010
import { validateEncodedProjectName } from "./pathUtils.ts";
11+
import { getStreamingMessages } from "../streaming/streamingFileManager.ts";
1112

1213
/**
1314
* Load a specific conversation by session ID
@@ -47,6 +48,66 @@ export async function loadConversation(
4748
sessionId,
4849
runtime,
4950
);
51+
52+
// Check for additional messages in streaming files
53+
const streamingMessages = await getStreamingMessages(
54+
encodedProjectName,
55+
sessionId,
56+
runtime,
57+
);
58+
59+
if (streamingMessages.length > 0) {
60+
// Merge streaming messages with conversation history
61+
// The streaming messages might contain duplicates or newer messages
62+
const existingMessageIds = new Set<string>();
63+
64+
// Build a set of existing message IDs/timestamps for deduplication
65+
for (const msg of conversationHistory.messages) {
66+
if (typeof msg === "object" && msg !== null && "timestamp" in msg) {
67+
existingMessageIds.add(JSON.stringify(msg));
68+
}
69+
}
70+
71+
// Add non-duplicate streaming messages
72+
let newMessagesAdded = 0;
73+
for (const streamingMsg of streamingMessages) {
74+
const msgKey = JSON.stringify(streamingMsg);
75+
if (!existingMessageIds.has(msgKey)) {
76+
conversationHistory.messages.push(streamingMsg);
77+
newMessagesAdded++;
78+
}
79+
}
80+
81+
if (newMessagesAdded > 0) {
82+
// Re-sort messages by timestamp if new messages were added
83+
conversationHistory.messages.sort(
84+
(a: unknown, b: unknown) => {
85+
const timeA = (a as { timestamp?: number }).timestamp || 0;
86+
const timeB = (b as { timestamp?: number }).timestamp || 0;
87+
return timeA - timeB;
88+
},
89+
);
90+
91+
// Update metadata
92+
conversationHistory.metadata.messageCount =
93+
conversationHistory.messages.length;
94+
if (conversationHistory.messages.length > 0) {
95+
const lastMsg = conversationHistory.messages[
96+
conversationHistory.messages.length - 1
97+
] as { timestamp?: number };
98+
if (lastMsg.timestamp) {
99+
conversationHistory.metadata.endTime = new Date(
100+
lastMsg.timestamp,
101+
).toISOString();
102+
}
103+
}
104+
105+
console.log(
106+
`[ConversationLoader] Merged ${newMessagesAdded} additional messages from streaming files for session ${sessionId}`,
107+
);
108+
}
109+
}
110+
50111
return conversationHistory;
51112
} catch (error) {
52113
throw error; // Re-throw any parsing errors
@@ -63,7 +124,10 @@ async function parseConversationFile(
63124
runtime: Runtime,
64125
): Promise<ConversationHistory> {
65126
const content = await runtime.readTextFile(filePath);
66-
const lines = content.trim().split("\n").filter((line) => line.trim());
127+
const lines = content
128+
.trim()
129+
.split("\n")
130+
.filter((line) => line.trim());
67131

68132
if (lines.length === 0) {
69133
throw new Error("Empty conversation file");

backend/streaming/streamingFileManager.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,3 +225,85 @@ export async function cleanupAllStreamingFiles(
225225
console.error(`Failed to clean up streaming directory:`, error);
226226
}
227227
}
228+
229+
/**
230+
* Get streaming messages for a specific sessionId
231+
* This searches through streaming files to find messages with matching sessionId
232+
*/
233+
export async function getStreamingMessages(
234+
encodedProjectName: string,
235+
sessionId: string,
236+
runtime: Runtime,
237+
): Promise<unknown[]> {
238+
const streamingDir = getStreamingDir(encodedProjectName, runtime);
239+
240+
// Check if streaming directory exists
241+
if (!(await runtime.exists(streamingDir))) {
242+
return [];
243+
}
244+
245+
const messages: unknown[] = [];
246+
247+
try {
248+
// List all files in the streaming directory
249+
const entries: string[] = [];
250+
for await (const entry of runtime.readDir(streamingDir)) {
251+
if (entry.isFile && entry.name.endsWith(".jsonl")) {
252+
entries.push(entry.name);
253+
}
254+
}
255+
256+
// Process each streaming file
257+
for (const file of entries) {
258+
const filePath = `${streamingDir}/${file}`;
259+
const content = await runtime.readTextFile(filePath);
260+
const lines = content
261+
.trim()
262+
.split("\n")
263+
.filter((line) => line.trim());
264+
265+
// Check each message for matching sessionId
266+
for (const line of lines) {
267+
try {
268+
const parsed = JSON.parse(line);
269+
270+
// Check if this is a claude_json message with SDK content
271+
if (
272+
parsed.type === "claude_json" &&
273+
parsed.content?.type === "sdk"
274+
) {
275+
const sdkMessage = parsed.content.message;
276+
277+
// Extract sessionId from different message types
278+
let messageSessionId: string | undefined;
279+
280+
if (sdkMessage.type === "system" && sdkMessage.session_id) {
281+
messageSessionId = sdkMessage.session_id;
282+
} else if (
283+
sdkMessage.type === "assistant" &&
284+
sdkMessage.session_id
285+
) {
286+
messageSessionId = sdkMessage.session_id;
287+
} else if (
288+
sdkMessage.type === "result" &&
289+
sdkMessage.session_id
290+
) {
291+
messageSessionId = sdkMessage.session_id;
292+
}
293+
294+
// If sessionId matches, add the SDK message
295+
if (messageSessionId === sessionId) {
296+
messages.push(sdkMessage);
297+
}
298+
}
299+
} catch (error) {
300+
console.error(`Failed to parse streaming message:`, error);
301+
}
302+
}
303+
}
304+
} catch (error) {
305+
console.error(`Failed to read streaming directory:`, error);
306+
}
307+
308+
return messages;
309+
}

0 commit comments

Comments
 (0)