Skip to content

feat: use sse for chat process api #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions service/src/chatgpt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,18 @@ search result: <search_result>${searchResultContent}</search_result>`,

const finish_reason = chunk.choices[0]?.finish_reason

// Build response object similar to the original implementation
// Build incremental response object
const responseChunk = {
id: chunk.id,
reasoning: responseReasoning,
text: responseText,
reasoning: responseReasoning, // Accumulated reasoning content
text: responseText, // Accumulated text content
role: 'assistant',
finish_reason,
// Incremental data
delta: {
reasoning: reasoningContent, // reasoning content in this chunk
text: content, // text content in this chunk
},
}

// Call the process callback if provided
Expand Down
5 changes: 5 additions & 0 deletions service/src/chatgpt/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ export interface ResponseChunk {
reasoning?: string
role?: string
finish_reason?: string
// 支持增量响应
delta?: {
reasoning?: string
text?: string
}
}

export interface RequestOptions {
Expand Down
74 changes: 64 additions & 10 deletions service/src/routes/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,12 @@ router.post('/chat-clear', auth, async (req, res) => {
})

router.post('/chat-process', [auth, limiter], async (req, res) => {
res.setHeader('Content-type', 'application/octet-stream')
// set headers for SSE
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')

let { roomId, uuid, regenerate, prompt, uploadFileKeys, options = {}, systemMessage, temperature, top_p } = req.body as RequestProps
const userId = req.headers.userId.toString()
Expand All @@ -241,6 +246,22 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
let result
let message: ChatInfo
let user = await getUserById(userId)

// SSE helper functions
const sendSSEData = (eventType: string, data: any) => {
res.write(`event: ${eventType}\n`)
res.write(`data: ${JSON.stringify(data)}\n\n`)
}

const sendSSEError = (error: string) => {
sendSSEData('error', JSON.stringify({ message: error }))
}

const sendSSEEnd = () => {
res.write('event: end\n')
res.write('data: [DONE]\n\n')
}

try {
// If use the fixed fakeuserid(some probability of duplicated with real ones), redefine user which is send to chatReplyProcess
if (userId === '6406d8c50aedd633885fa16f') {
Expand All @@ -251,29 +272,56 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
if (config.siteConfig?.usageCountLimit) {
const useAmount = user ? (user.useAmount ?? 0) : 0
if (Number(useAmount) <= 0 && user.limit_switch) {
res.send({ status: 'Fail', message: '提问次数用完啦 | Question limit reached', data: null })
sendSSEError('提问次数用完啦 | Question limit reached')
sendSSEEnd()
res.end()
return
}
}
}

if (config.auditConfig.enabled || config.auditConfig.customizeEnabled) {
if (!user.roles.includes(UserRole.Admin) && await containsSensitiveWords(config.auditConfig, prompt)) {
res.send({ status: 'Fail', message: '含有敏感词 | Contains sensitive words', data: null })
sendSSEError('含有敏感词 | Contains sensitive words')
sendSSEEnd()
res.end()
return
}
}

message = regenerate ? await getChat(roomId, uuid) : await insertChat(uuid, prompt, uploadFileKeys, roomId, model, options as ChatOptions)
let firstChunk = true

result = await chatReplyProcess({
message: prompt,
uploadFileKeys,
parentMessageId: options?.parentMessageId,
process: (chunk: ResponseChunk) => {
lastResponse = chunk

res.write(firstChunk ? JSON.stringify(chunk) : `\n${JSON.stringify(chunk)}`)
firstChunk = false
// set sse event by different data type
if (chunk.searchQuery) {
sendSSEData('search_query', { searchQuery: chunk.searchQuery })
}
if (chunk.searchResults) {
sendSSEData('search_results', {
searchResults: chunk.searchResults,
searchUsageTime: chunk.searchUsageTime,
})
}
if (chunk.delta) {
// send SSE event with delta type
sendSSEData('delta', { m: chunk.delta })
}
else {
// send all data
sendSSEData('message', {
id: chunk.id,
reasoning: chunk.reasoning,
text: chunk.text,
role: chunk.role,
finish_reason: chunk.finish_reason,
})
}
},
systemMessage,
temperature,
Expand All @@ -283,11 +331,17 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
room,
chatUuid: uuid,
})
// return the whole response including usage
res.write(`\n${JSON.stringify(result.data)}`)

// 发送最终完成数据
if (result && result.status === 'Success') {
sendSSEData('complete', result.data)
}

sendSSEEnd()
}
catch (error) {
res.write(JSON.stringify({ message: error?.message }))
sendSSEError(error?.message || 'Unknown error')
sendSSEEnd()
}
finally {
res.end()
Expand All @@ -299,7 +353,7 @@ router.post('/chat-process', [auth, limiter], async (req, res) => {
}

if (result.data === undefined)
// eslint-disable-next-line no-unsafe-finally
// eslint-disable-next-line no-unsafe-finally
return

if (regenerate && message.options.messageId) {
Expand Down
120 changes: 107 additions & 13 deletions src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { AxiosProgressEvent, GenericAbortSignal } from 'axios'
import type { AnnounceConfig, AuditConfig, ConfigState, GiftCard, KeyConfig, MailConfig, SearchConfig, SiteConfig, Status, UserInfo, UserPassword, UserPrompt } from '@/components/common/Setting/model'
import type { SettingsState } from '@/store/modules/user/helper'
import { useUserStore } from '@/store'
import { useAuthStore, useUserStore } from '@/store'
import { get, post } from '@/utils/request'

export function fetchAnnouncement<T = any>() {
Expand All @@ -16,19 +15,32 @@ export function fetchChatConfig<T = any>() {
})
}

export function fetchChatAPIProcess<T = any>(
// SSE event handler interface
interface SSEEventHandlers {
onMessage?: (data: any) => void
onDelta?: (delta: { reasoning?: string, text?: string }) => void
onSearchQuery?: (data: { searchQuery: string }) => void
onSearchResults?: (data: { searchResults: any[], searchUsageTime: number }) => void
onComplete?: (data: any) => void
onError?: (error: string) => void
onEnd?: () => void
}

// SSE chat processing function
export function fetchChatAPIProcessSSE(
params: {
roomId: number
uuid: number
regenerate?: boolean
prompt: string
uploadFileKeys?: string[]
options?: { conversationId?: string, parentMessageId?: string }
signal?: GenericAbortSignal
onDownloadProgress?: (progressEvent: AxiosProgressEvent) => void
signal?: AbortSignal
},
) {
handlers: SSEEventHandlers,
): Promise<void> {
const userStore = useUserStore()
const authStore = useAuthStore()

const data: Record<string, any> = {
roomId: params.roomId,
Expand All @@ -42,11 +54,93 @@ export function fetchChatAPIProcess<T = any>(
top_p: userStore.userInfo.advanced.top_p,
}

return post<T>({
url: '/chat-process',
data,
signal: params.signal,
onDownloadProgress: params.onDownloadProgress,
return new Promise((resolve, reject) => {
const baseURL = import.meta.env.VITE_GLOB_API_URL || ''
const url = `${baseURL}/api/chat-process`

fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': authStore.token ? `Bearer ${authStore.token}` : '',
},
body: JSON.stringify(data),
signal: params.signal,
}).then((response) => {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}

const reader = response.body?.getReader()
if (!reader) {
throw new Error('No reader available')
}

const decoder = new TextDecoder()
let buffer = ''

function readStream(): void {
reader!.read().then(({ done, value }) => {
if (done) {
handlers.onEnd?.()
resolve()
return
}

buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || '' // Keep the incomplete line in buffer

for (const line of lines) {
if (line.trim() === '')
continue

if (line.startsWith('event: ')) {
// const _eventType = line.substring(7).trim()
continue
}

if (line.startsWith('data: ')) {
const data = line.substring(6).trim()

if (data === '[DONE]') {
handlers.onEnd?.()
resolve()
return
}

try {
const jsonData = JSON.parse(data)

// 根据前面的 event 类型分发到不同的处理器
if (jsonData.message) {
handlers.onError?.(jsonData.message)
}
else if (jsonData.searchQuery) {
handlers.onSearchQuery?.(jsonData)
}
else if (jsonData.searchResults) {
handlers.onSearchResults?.(jsonData)
}
else if (jsonData.m) {
handlers.onDelta?.(jsonData.m)
}
else {
handlers.onMessage?.(jsonData)
}
}
catch (e) {
console.error('Failed to parse SSE data:', data, e)
}
}
}

readStream()
}).catch(reject)
}

readStream()
}).catch(reject)
})
}

Expand Down Expand Up @@ -94,7 +188,7 @@ export function fetchLogin<T = any>(username: string, password: string, token?:
export function fetchLogout<T = any>() {
return post<T>({
url: '/user-logout',
data: { },
data: {},
})
}

Expand Down Expand Up @@ -309,7 +403,7 @@ export function fetchGetChatHistory<T = any>(roomId: number, lastId?: number, al
export function fetchClearAllChat<T = any>() {
return post<T>({
url: '/chat-clear-all',
data: { },
data: {},
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/views/chat/components/Message/Search.vue
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const instance = getCurrentInstance()
const uid = instance?.uid || Date.now() + Math.random().toString(36).substring(2)

const textRef = ref<HTMLElement>()
const isCollapsed = ref(false)
const isCollapsed = ref(true)

const searchBtnTitle = computed(() => {
return t('chat.expandCollapseSearchResults')
Expand Down
Loading