做企业网站建设挣钱吗,如何自己做论坛网站,微信网站链接网站建设,区块链企业解决方案SSE流式输出的实现过程 后端处理 在创建流式会话时#xff0c;我们要对这个请求设置好SSE所需要的请求头#xff0c;然后再创建会话#xff0c;返回会话ID#xff0c;紧接着就把会话ID传给前端#xff0c;让前端绑定这个会话#xff0c;开始准备流式输出 controller层处理…SSE流式输出的实现过程后端处理在创建流式会话时我们要对这个请求设置好SSE所需要的请求头然后再创建会话返回会话ID紧接着就把会话ID传给前端让前端绑定这个会话开始准备流式输出controller层处理新建会话请求如果是对已存在的会话就不需要新建从请求体里获取会话ID进行绑定funcCreateStreamSessionAndSendMessage(c*gin.Context){req:new(CreateSessionAndSendMessageRequest)userName:c.GetString(userName)// From JWT middlewareiferr:c.ShouldBindJSON(req);err!nil{c.JSON(http.StatusOK,gin.H{error:Invalid parameters})return}// 设置SSE头c.Header(Content-Type,text/event-stream)// SSE协议的标识告诉客户端这个是流式输出c.Header(Cache-Control,no-cache, no-transform)// 让浏览器不缓存响应每次都获取新的内容禁止代理/CDN对响应内容做转换c.Header(Connection,keep-alive)// 保持长连接c.Header(Access-Control-Allow-Origin,*)// 允许跨域c.Header(X-Accel-Buffering,no)// 禁止Nginx的缓存区如果不设置这个请求头就会出现响应数据被堆积随后再发送给前端的情况设置为no的话就能让数据实时透传c.Header(Content-Encoding,identity)// 禁止对响应内容进行编码或者压缩没有这个也会导致消息被堆积// 先创建会话并立即把 sessionId 下发给前端随后再开始流式输出sessionID,code_:session.CreateStreamSessionOnly(userName,req.UserQuestion)ifcode_!code.CodeSuccess{c.SSEvent(error,gin.H{message:Failed to create session})return}// 先把 sessionId 通过 data 事件发送给前端前端据此绑定当前会话侧边栏即可出现新标签c.Writer.WriteString(fmt.Sprintf(data: {\sessionId\: \%s\}\n\n,sessionID))c.Writer.Flush()// 然后开始把本次回答进行流式发送包含最后的 [DONE]code_session.StreamMessageToExistingSession(userName,sessionID,req.UserQuestion,req.ModelType,http.ResponseWriter(c.Writer))ifcode_!code.CodeSuccess{c.SSEvent(error,gin.H{message:Failed to send message})return}}service层处理逻辑利用Flush把服务端响应的数据直接推送给客户端不需要等待缓冲区满了才推送。funcStreamMessageToExistingSession(userNamestring,sessionIDstring,userQuestionstring,modelTypestring,writer http.ResponseWriter)code.Code{log.Printf([Service] StreamMessageToExistingSession Start. User%s, Session%s, Model%s,userName,sessionID,modelType)// 确保writer支持Flushflusher,ok:writer.(http.Flusher)// 类型断言if!ok{log.Println(不支持Flush)returncode.CodeServerBusy}//2获取AIHelper并通过其管理消息manager:aihelper.GetGlobalManager()config:map[string]interface{}{apiKey:your-api-key,// TODO: 从配置中获取username:userName,// 用于 RAG 模型获取用户文档}log.Println([Service] Getting AIHelper...)helper,err:manager.GetOrCreateAIHelper(userName,sessionID,modelType,config)iferr!nil{log.Println(StreamMessageToExistingSession GetOrCreateAIHelper error:,err)returncode.AIModelFail}log.Println([Service] AIHelper Obtained. Starting StreamResponse...)// 定义callback函数来实时flush推送消息cb:func(msgstring){log.Printf([SSE] Sending chunk: %s (len%d)\n,msg,len(msg))payload,er:json.Marshal(map[string]string){type:delta,content:msg,}iferr!nil{log.Println([SSE] Marshal error:,err)return}_,errwriter.Write([]byte(data: string(payload)\n\n))iferr!nil{log.Println([SSE] Write error:,err)return}flusher.Flush()log.Println([SSE] Flushed)}_,err_:helper.StreamResponse(userName,ctx,cb,userQuestion)iferr_!nil{log.Println(StreamMessageToExistingSession StreamResponse error:,err_)returncode.AIModelFail}_,errwriter.Write([]byte(data: [DONE]\n\n))iferr!nil{log.Println(StreamMessageToExistingSession write DONE error:,err)returncode.AIModelFail}flusher.Flush()returncode.CodeSuccess}这里manager和aihelper的执行逻辑就不展示了这篇blog的目的是要了解流式传输的逻辑。每次生成响应的时候回调cb来将AI生成的消息实时推送给客户端流响应结束后会额外发一条data:[DONE]前端根据这个来判断此次回答是否结束。前端处理关键是通过fetch建立一个长连接后面从response.body里流式读取数据直到读到done后breakasyncfunctionhandleStreaming(question){// 占一个位置表示正在回答constaiMessage{role:assistant,content:,meta:{status:streaming}// mark streaming}constaiMessageIndexcurrentMessages.value.length currentMessages.value.push(aiMessage)// 决定用哪个URLconstisDevwindow.location.hostnamelocalhost||window.location.hostname127.0.0.1constbackendBaseisDev?http://${window.location.hostname}:9090/api/v1/AI:/api/AIconsturltempSession.value?${backendBase}/chat/send-stream-new-session:${backendBase}/chat/send-stream// 构建请求头请求体constheaders{Content-Type:application/json,Authorization:Bearer${localStorage.getItem(token)||}}constbodytempSession.value?{question:question,modelType:selectedModel.value}:{question:question,modelType:selectedModel.value,sessionId:currentSessionId.value}}try{// 创建 fetch 连接读取 SSE 流constresponseawaitfetch(url,{method:POST,headers,body:JSON.stringify(body)})if(!response.ok){loading.valuefalsethrownewError(Network response was not ok)}constreaderresponse.body.getReader()constdecodernewTextDecoder()letbuffer// 读取流数据// eslint-disable-next-line no-constant-conditionwhile(true){const{done,value}awaitreader.read()if(done)breakconstchunkdecoder.decode(value,{stream:true})bufferchunk// 按行分割constlinesbuffer.split(\n)bufferlines.pop()||// 保留未完成的行for(constlineoflines){constnormalizedLineline.endsWith(\r)?line.slice(0,-1):lineif(!normalizedLine)continue// 处理 SSE 格式data: contentif(normalizedLine.startsWith(data:)){letdatanormalizedLine.slice(5)if(data.startsWith( ))datadata.slice(1)console.log([SSE] Received:,data)// 调试日志if(data[DONE]){// 流结束console.log([SSE] Stream done)loading.valuefalsecurrentMessages.value[aiMessageIndex].meta{status:done}currentMessages.value[...currentMessages.value]}elseif(data.startsWith({)){// 尝试解析 JSON如 sessionIdtry{constparsedJSON.parse(data)if(parsed.sessionId){constnewSidString(parsed.sessionId)console.log([SSE] Session ID:,newSid)if(tempSession.value){lettitle(question||).trim()if(!title){title会话${newSid}}elseif(title.length30){title${title.slice(0,30)}...}sessions.value[newSid]{id:newSid,name:title,messages:[...currentMessages.value]}currentSessionId.valuenewSid tempSession.valuefalse}}elseif(parsed.typedeltatypeofparsed.contentstring){currentMessages.value[aiMessageIndex].contentparsed.content}}catch(e){// 不是 JSON当作普通文本处理currentMessages.value[aiMessageIndex].contentdataconsole.log([SSE] Content updated:,currentMessages.value[aiMessageIndex].content.length)}}else{// 普通文本数据直接追加// 使用数组索引直接更新强制 Vue 响应式系统检测变化currentMessages.value[aiMessageIndex].contentdataconsole.log([SSE] Content updated:,currentMessages.value[aiMessageIndex].content.length)}// 每收到一条数据就立即更新 DOM// 强制更新整个数组以触发响应式currentMessages.value[...currentMessages.value]// 使用 requestAnimationFrame 强制浏览器重排awaitnewPromise(resolve{requestAnimationFrame((){scrollToBottom()resolve()})})}}}// 流读取完成后的处理loading.valuefalsecurrentMessages.value[aiMessageIndex].meta{status:done}currentMessages.value[...currentMessages.value]// 同步到 sessions 存储if(!tempSession.valuecurrentSessionId.valuesessions.value[currentSessionId.value]){constsessMsgssessions.value[currentSessionId.value].messagesif(Array.isArray(sessMsgs)sessMsgs.length){constlastIndexsessMsgs.length-1if(sessMsgs[lastIndex]sessMsgs[lastIndex].roleassistant){sessMsgs[lastIndex].contentcurrentMessages.value[aiMessageIndex].content}}}}catch(err){console.error(Stream error:,err)loading.valuefalsecurrentMessages.value[aiMessageIndex].meta{status:error}currentMessages.value[...currentMessages.value]ElMessage.error(流式传输出错)}}