Skip to content

feat: use sse for chat process api #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions service/src/chatgpt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,18 @@ search result: <search_result>${searchResultContent}</search_result>`,

const finish_reason = chunk.choices[0]?.finish_reason

// Build response object similar to the original implementation
// 构建增量响应对象
const responseChunk = {
id: chunk.id,
reasoning: responseReasoning,
text: responseText,
reasoning: responseReasoning, // 累积的推理内容
text: responseText, // 累积的文本内容
role: 'assistant',
finish_reason,
// 增量数据,只包含本次新增的内容
delta: {
reasoning: reasoningContent, // 本次新增的推理内容
text: content, // 本次新增的文本内容
},
}

// Call the process callback if provided
Expand Down
5 changes: 5 additions & 0 deletions service/src/chatgpt/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ export interface ResponseChunk {
reasoning?: string
role?: string
finish_reason?: string
// 支持增量响应
delta?: {
reasoning?: string
text?: string
}
}

export interface RequestOptions {
Expand Down
74 changes: 64 additions & 10 deletions service/src/routes/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,12 @@ router.post('/chat-clear', auth, async (req, res) => {
})

router.post('/chat-process', [auth, limiter], async (req, res) => {
res.setHeader('Content-type', 'application/octet-stream')
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')

let { roomId, uuid, regenerate, prompt, uploadFileKeys, options = {}, systemMessage, temperature, top_p } = req.body as RequestProps
const userId = req.headers.userId.toString()
Expand All @@ -241,6 +246,22 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
let result
let message: ChatInfo
let user = await getUserById(userId)

// SSE 辅助函数
const sendSSEData = (eventType: string, data: any) => {
res.write(`event: ${eventType}\n`)
res.write(`data: ${JSON.stringify(data)}\n\n`)
}

const sendSSEError = (error: string) => {
sendSSEData('error', JSON.stringify({ message: error }))
}

const sendSSEEnd = () => {
res.write('event: end\n')
res.write('data: [DONE]\n\n')
}

try {
// If use the fixed fakeuserid(some probability of duplicated with real ones), redefine user which is send to chatReplyProcess
if (userId === '6406d8c50aedd633885fa16f') {
Expand All @@ -251,29 +272,56 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
if (config.siteConfig?.usageCountLimit) {
const useAmount = user ? (user.useAmount ?? 0) : 0
if (Number(useAmount) <= 0 && user.limit_switch) {
res.send({ status: 'Fail', message: '提问次数用完啦 | Question limit reached', data: null })
sendSSEError('提问次数用完啦 | Question limit reached')
sendSSEEnd()
res.end()
return
}
}
}

if (config.auditConfig.enabled || config.auditConfig.customizeEnabled) {
if (!user.roles.includes(UserRole.Admin) && await containsSensitiveWords(config.auditConfig, prompt)) {
res.send({ status: 'Fail', message: '含有敏感词 | Contains sensitive words', data: null })
sendSSEError('含有敏感词 | Contains sensitive words')
sendSSEEnd()
res.end()
return
}
}

message = regenerate ? await getChat(roomId, uuid) : await insertChat(uuid, prompt, uploadFileKeys, roomId, model, options as ChatOptions)
let firstChunk = true

result = await chatReplyProcess({
message: prompt,
uploadFileKeys,
parentMessageId: options?.parentMessageId,
process: (chunk: ResponseChunk) => {
lastResponse = chunk

res.write(firstChunk ? JSON.stringify(chunk) : `\n${JSON.stringify(chunk)}`)
firstChunk = false
// 根据数据类型发送不同的 SSE 事件
if (chunk.searchQuery) {
sendSSEData('search_query', { searchQuery: chunk.searchQuery })
}
if (chunk.searchResults) {
sendSSEData('search_results', {
searchResults: chunk.searchResults,
searchUsageTime: chunk.searchUsageTime,
})
}
if (chunk.delta) {
// 发送增量数据
sendSSEData('delta', { m: chunk.delta })
}
else {
// 兼容现有格式,发送完整数据但标记为增量类型
sendSSEData('message', {
id: chunk.id,
reasoning: chunk.reasoning,
text: chunk.text,
role: chunk.role,
finish_reason: chunk.finish_reason,
})
}
},
systemMessage,
temperature,
Expand All @@ -283,11 +331,17 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
room,
chatUuid: uuid,
})
// return the whole response including usage
res.write(`\n${JSON.stringify(result.data)}`)

// 发送最终完成数据
if (result && result.status === 'Success') {
sendSSEData('complete', result.data)
}

sendSSEEnd()
}
catch (error) {
res.write(JSON.stringify({ message: error?.message }))
sendSSEError(error?.message || 'Unknown error')
sendSSEEnd()
}
finally {
res.end()
Expand All @@ -299,7 +353,7 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
}

if (result.data === undefined)
// eslint-disable-next-line no-unsafe-finally
// eslint-disable-next-line no-unsafe-finally
return

if (regenerate && message.options.messageId) {
Expand Down
136 changes: 133 additions & 3 deletions src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { AxiosProgressEvent, GenericAbortSignal } from 'axios'
import type { AnnounceConfig, AuditConfig, ConfigState, GiftCard, KeyConfig, MailConfig, SearchConfig, SiteConfig, Status, UserInfo, UserPassword, UserPrompt } from '@/components/common/Setting/model'
import type { SettingsState } from '@/store/modules/user/helper'
import { useUserStore } from '@/store'
import { useAuthStore, useUserStore } from '@/store'
import { get, post } from '@/utils/request'

export function fetchAnnouncement<T = any>() {
Expand All @@ -16,6 +16,136 @@ export function fetchChatConfig<T = any>() {
})
}

// SSE 事件处理器接口
interface SSEEventHandlers {
onMessage?: (data: any) => void
onDelta?: (delta: { reasoning?: string, text?: string }) => void
onSearchQuery?: (data: { searchQuery: string }) => void
onSearchResults?: (data: { searchResults: any[], searchUsageTime: number }) => void
onComplete?: (data: any) => void
onError?: (error: string) => void
onEnd?: () => void
}

// 新的 SSE 聊天处理函数
export function fetchChatAPIProcessSSE(
params: {
roomId: number
uuid: number
regenerate?: boolean
prompt: string
uploadFileKeys?: string[]
options?: { conversationId?: string, parentMessageId?: string }
signal?: AbortSignal
},
handlers: SSEEventHandlers,
): Promise<void> {
const userStore = useUserStore()
const authStore = useAuthStore()

const data: Record<string, any> = {
roomId: params.roomId,
uuid: params.uuid,
regenerate: params.regenerate || false,
prompt: params.prompt,
uploadFileKeys: params.uploadFileKeys,
options: params.options,
systemMessage: userStore.userInfo.advanced.systemMessage,
temperature: userStore.userInfo.advanced.temperature,
top_p: userStore.userInfo.advanced.top_p,
}

return new Promise((resolve, reject) => {
const baseURL = import.meta.env.VITE_GLOB_API_URL || ''
const url = `${baseURL}/api/chat-process`

fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': authStore.token ? `Bearer ${authStore.token}` : '',
},
body: JSON.stringify(data),
signal: params.signal,
}).then((response) => {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}

const reader = response.body?.getReader()
if (!reader) {
throw new Error('No reader available')
}

const decoder = new TextDecoder()
let buffer = ''

function readStream(): void {
reader!.read().then(({ done, value }) => {
if (done) {
handlers.onEnd?.()
resolve()
return
}

buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || '' // Keep the incomplete line in buffer

for (const line of lines) {
if (line.trim() === '')
continue

if (line.startsWith('event: ')) {
// const _eventType = line.substring(7).trim()
continue
}

if (line.startsWith('data: ')) {
const data = line.substring(6).trim()

if (data === '[DONE]') {
handlers.onEnd?.()
resolve()
return
}

try {
const jsonData = JSON.parse(data)

// 根据前面的 event 类型分发到不同的处理器
if (jsonData.message) {
handlers.onError?.(jsonData.message)
}
else if (jsonData.searchQuery) {
handlers.onSearchQuery?.(jsonData)
}
else if (jsonData.searchResults) {
handlers.onSearchResults?.(jsonData)
}
else if (jsonData.m) {
handlers.onDelta?.(jsonData.m)
}
else {
handlers.onMessage?.(jsonData)
}
}
catch (e) {
console.error('Failed to parse SSE data:', data, e)
}
}
}

readStream()
}).catch(reject)
}

readStream()
}).catch(reject)
})
}

// 保持向后兼容的函数(如果需要的话)
export function fetchChatAPIProcess<T = any>(
params: {
roomId: number
Expand Down Expand Up @@ -94,7 +224,7 @@ export function fetchLogin<T = any>(username: string, password: string, token?:
export function fetchLogout<T = any>() {
return post<T>({
url: '/user-logout',
data: { },
data: {},
})
}

Expand Down Expand Up @@ -309,7 +439,7 @@ export function fetchGetChatHistory<T = any>(roomId: number, lastId?: number, al
export function fetchClearAllChat<T = any>() {
return post<T>({
url: '/chat-clear-all',
data: { },
data: {},
})
}

Expand Down
Loading