Skip to content

feat: 增加fetch请求,优化axios对流式数据处理的不友好 #640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 50 additions & 76 deletions src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { AnnounceConfig, AuditConfig, ConfigState, GiftCard, KeyConfig, MailConfig, SearchConfig, SiteConfig, Status, UserInfo, UserPassword, UserPrompt } from '@/components/common/Setting/model'
import type { SettingsState } from '@/store/modules/user/helper'
import { useAuthStore, useUserStore } from '@/store'
import { useUserStore } from '@/store'
import { get, post } from '@/utils/request'
import fetchService from '@/utils/request/fetchService'

export function fetchAnnouncement<T = any>() {
return post<T>({
Expand All @@ -26,7 +27,7 @@ interface SSEEventHandlers {
onEnd?: () => void
}

// SSE chat processing function
// SSE chat processing function using custom fetch service
export function fetchChatAPIProcessSSE(
params: {
roomId: number
Expand All @@ -40,7 +41,6 @@ export function fetchChatAPIProcessSSE(
handlers: SSEEventHandlers,
): Promise<void> {
const userStore = useUserStore()
const authStore = useAuthStore()

const data: Record<string, any> = {
roomId: params.roomId,
Expand All @@ -55,92 +55,66 @@ export function fetchChatAPIProcessSSE(
}

return new Promise((resolve, reject) => {
const baseURL = import.meta.env.VITE_GLOB_API_URL || ''
const url = `${baseURL}/api/chat-process`

fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': authStore.token ? `Bearer ${authStore.token}` : '',
fetchService.postStream(
{
url: '/chat-process',
body: data,
signal: params.signal,
},
body: JSON.stringify(data),
signal: params.signal,
}).then((response) => {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}

const reader = response.body?.getReader()
if (!reader) {
throw new Error('No reader available')
}

const decoder = new TextDecoder()
let buffer = ''

function readStream(): void {
reader!.read().then(({ done, value }) => {
if (done) {
handlers.onEnd?.()
resolve()
{
onChunk: (line: string) => {
if (line.trim() === '')
return
}

buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || '' // Keep the incomplete line in buffer
if (line.startsWith('event: ')) {
// const _eventType = line.substring(7).trim()
return
}

for (const line of lines) {
if (line.trim() === '')
continue
if (line.startsWith('data: ')) {
const data = line.substring(6).trim()

if (line.startsWith('event: ')) {
// const _eventType = line.substring(7).trim()
continue
if (data === '[DONE]') {
handlers.onEnd?.()
resolve()
return
}

if (line.startsWith('data: ')) {
const data = line.substring(6).trim()
try {
const jsonData = JSON.parse(data)

if (data === '[DONE]') {
handlers.onEnd?.()
resolve()
return
// Dispatch to different handlers based on data type
if (jsonData.message) {
handlers.onError?.(jsonData.message)
}

try {
const jsonData = JSON.parse(data)

// 根据前面的 event 类型分发到不同的处理器
if (jsonData.message) {
handlers.onError?.(jsonData.message)
}
else if (jsonData.searchQuery) {
handlers.onSearchQuery?.(jsonData)
}
else if (jsonData.searchResults) {
handlers.onSearchResults?.(jsonData)
}
else if (jsonData.m) {
handlers.onDelta?.(jsonData.m)
}
else {
handlers.onMessage?.(jsonData)
}
else if (jsonData.searchQuery) {
handlers.onSearchQuery?.(jsonData)
}
else if (jsonData.searchResults) {
handlers.onSearchResults?.(jsonData)
}
catch (e) {
console.error('Failed to parse SSE data:', data, e)
else if (jsonData.m) {
handlers.onDelta?.(jsonData.m)
}
else {
handlers.onMessage?.(jsonData)
}
}
catch (e) {
console.error('Failed to parse SSE data:', data, e)
}
}

readStream()
}).catch(reject)
}

readStream()
}).catch(reject)
},
onError: (error: Error) => {
handlers.onError?.(error.message)
reject(error)
},
onComplete: () => {
handlers.onEnd?.()
resolve()
},
},
)
})
}

Expand Down
147 changes: 147 additions & 0 deletions src/utils/request/fetchService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { useAuthStore } from '@/store'

export interface FetchRequestConfig {
url: string
method?: string
headers?: Record<string, string>
body?: any
signal?: AbortSignal
}

export interface FetchResponse<T = any> {
data: T
status: number
statusText: string
headers: Headers
}

export interface SSEStreamOptions {
onChunk?: (chunk: string) => void
onError?: (error: Error) => void
onComplete?: () => void
}

class FetchService {
private baseURL: string
private defaultHeaders: Record<string, string>

constructor() {
this.baseURL = import.meta.env.VITE_GLOB_API_URL || ''
this.defaultHeaders = {
'Content-Type': 'application/json',
}
}

// Request interceptor - automatically add authentication headers and other configurations
private requestInterceptor(config: FetchRequestConfig): FetchRequestConfig {
const token = useAuthStore().token
const headers = { ...this.defaultHeaders, ...config.headers }

if (token) {
headers.Authorization = `Bearer ${token}`
}

return {
...config,
headers,
}
}

// Response interceptor - handle error status
private async responseInterceptor(response: Response): Promise<Response> {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}
return response
}

// POST request
async post<T = any>(config: FetchRequestConfig): Promise<FetchResponse<T>> {
const processedConfig = this.requestInterceptor(config)
const url = `${this.baseURL}${processedConfig.url}`

const response = await fetch(url, {
method: 'POST',
headers: processedConfig.headers,
body: typeof processedConfig.body === 'object'
? JSON.stringify(processedConfig.body)
: processedConfig.body,
signal: processedConfig.signal,
})

const processedResponse = await this.responseInterceptor(response)
const data = await processedResponse.json()

return {
data,
status: processedResponse.status,
statusText: processedResponse.statusText,
headers: processedResponse.headers,
}
}

// SSE streaming request
async postStream(config: FetchRequestConfig, options: SSEStreamOptions): Promise<void> {
const processedConfig = this.requestInterceptor(config)
const url = `${this.baseURL}${processedConfig.url}`

try {
const response = await fetch(url, {
method: 'POST',
headers: processedConfig.headers,
body: typeof processedConfig.body === 'object'
? JSON.stringify(processedConfig.body)
: processedConfig.body,
signal: processedConfig.signal,
})

await this.responseInterceptor(response)

if (!response.body) {
throw new Error('ReadableStream not supported')
}

const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''

try {
while (true) {
const { done, value } = await reader.read()

if (done) {
options.onComplete?.()
break
}

// Decode the chunk and add to buffer
buffer += decoder.decode(value, { stream: true })

// Process complete lines
const lines = buffer.split('\n')
// Keep the last potentially incomplete line
buffer = lines.pop() || ''

for (const line of lines) {
if (line.trim()) {
options.onChunk?.(line)
}
}
}
}
catch (error) {
options.onError?.(error as Error)
throw error
}
}
catch (error) {
options.onError?.(error as Error)
throw error
}
}
}

// Create singleton instance
const fetchService = new FetchService()

export default fetchService
24 changes: 24 additions & 0 deletions src/views/chat/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ async function onRegenerate(index: number) {
let lastText = ''
let accumulatedReasoning = ''
const fetchChatAPIOnce = async () => {
let searchQuery: string
let searchResults: Chat.SearchResult[]
let searchUsageTime: number

await fetchChatAPIProcessSSE({
roomId: currentChatRoom.value!.roomId,
uuid: chatUuid || Date.now(),
Expand All @@ -377,6 +381,13 @@ async function onRegenerate(index: number) {
options,
signal: controller.signal,
}, {
onSearchQuery: (data) => {
searchQuery = data.searchQuery
},
onSearchResults: (data) => {
searchResults = data.searchResults
searchUsageTime = data.searchUsageTime
},
onDelta: async (delta) => {
// 处理增量数据
if (delta.text) {
Expand All @@ -391,6 +402,9 @@ async function onRegenerate(index: number) {
index,
{
dateTime: new Date().toLocaleString(),
searchQuery,
searchResults,
searchUsageTime,
reasoning: accumulatedReasoning,
text: lastText,
inversion: false,
Expand All @@ -406,6 +420,13 @@ async function onRegenerate(index: number) {
},
onMessage: async (data) => {
// Handle complete message data (compatibility mode)
if (data.searchQuery)
searchQuery = data.searchQuery
if (data.searchResults)
searchResults = data.searchResults
if (data.searchUsageTime)
searchUsageTime = data.searchUsageTime
// Handle complete message data (compatibility mode)
const usage = (data.detail && data.detail.usage)
? {
completion_tokens: data.detail.usage.completion_tokens || null,
Expand All @@ -419,6 +440,9 @@ async function onRegenerate(index: number) {
index,
{
dateTime: new Date().toLocaleString(),
searchQuery,
searchResults,
searchUsageTime,
reasoning: data?.reasoning,
finish_reason: data?.finish_reason,
text: data.text ?? '',
Expand Down