2026/4/7 7:24:21
网站建设
项目流程
哪里有免费的网站域名,西安短视频代运营,怎样在本机建设网站,国人在线做网站怎么样流式传输技术详解#xff1a;从概念到实现的全过程
目录
什么是流式传输#xff1f;流式传输的实现要求流式传输的三个层面适配层与包装层的实现消息类型分类逻辑完整流程示例总结与最佳实践 什么是流式传输#xff1f;
**流式传输#xff08;Streaming#xff09;**是…流式传输技术详解从概念到实现的全过程目录什么是流式传输流式传输的实现要求流式传输的三个层面适配层与包装层的实现消息类型分类逻辑完整流程示例总结与最佳实践什么是流式传输**流式传输Streaming**是一种数据传输方式数据以连续流的形式逐步发送和接收而不是一次性传输全部数据。主要特点实时性数据生成后立即发送无需等待全部完成渐进式接收端可以边接收边处理低延迟用户能更快看到结果内存友好不需要在内存中保存全部数据应用场景聊天应用实时消息文件上传/下载进度实时日志监控AI 模型生成逐字输出← 本文重点数据导入/导出进度流式传输的实现要求⚠️ 重要原则双方都必须实现流式传输需要发送方和接收方都实现才能正常工作组合结果✅ 后端流式 前端流式正常工作实时显示进度❌ 后端流式 前端非流式无法正常工作或体验差⚠️ 后端非流式 前端流式可以工作但无实时效果✅ 后端非流式 前端非流式传统方式等待全部完成发送方后端实现# Flask 流式响应defgenerate():yieldfdata:{json.dumps({type:info,message:开始处理...},ensure_asciiFalse)}\n\n# 处理逻辑...yieldfdata:{json.dumps({type:token,token:文本片段},ensure_asciiFalse)}\n\nreturnResponse(stream_with_context(generate()),mimetypetext/event-stream,headers{Cache-Control:no-cache,Connection:keep-alive})接收方前端实现// Fetch API 流式读取constreaderresponse.body.getReader();constdecodernewTextDecoder();letbuffer;while(true){const{done,value}awaitreader.read();if(done)break;bufferdecoder.decode(value,{stream:true});constlinesbuffer.split(\n);bufferlines.pop();for(constlineoflines){if(line.startsWith(data: )){constmsgJSON.parse(line.substring(6));// 处理消息...}}}流式传输的三个层面流式传输的实现涉及三个不同的层面每个层面都有不同的格式要求1. 传输协议层标准统一Server-Sent Events (SSE)- Web 标准协议# 后端设置 SSE 响应头mimetypetext/event-streamheaders{Cache-Control:no-cache,Connection:keep-alive,...}SSE 格式规范每行以data:开头每行以\n\n结尾两个换行符这是标准格式所有浏览器都支持2. 数据格式层项目自定义自定义 JSON 消息格式# 后端发送的消息格式{type:token,# 消息类型标识token:文本片段# 实际的文本内容}消息类型info: 状态信息error: 错误信息results: 搜索结果token: 答案片段流式生成answer: 完整答案final: 最终数据3. 大模型 API 层模型特定不同大模型库有不同的流式格式模型库流式格式Ollama{delta: 文本}或{text: 文本}LlamaIndex{delta: 文本}或{response: Object}OpenAI{delta: 文本}适配层与包装层的实现这是流式传输的核心技术如何统一不同格式。完整流程大模型库Ollama/LlamaIndex ↓ {delta: 文本} / {text: 文本} / {response: Object} ↓ ┌─────────────────────────────────────────┐ │ 适配层Adapter Layer │ │ rag_generate_with_llamaindex_stream() │ │ │ │ 检查格式 → 提取文本 → 统一为字符串 │ └─────────────────────────────────────────┘ ↓ 文本片段 (字符串) ↓ ┌─────────────────────────────────────────┐ │ 包装层Wrapper Layer │ │ search_alarm() 路由函数 │ │ │ │ 字符串 → JSON对象 → JSON字符串 → SSE │ └─────────────────────────────────────────┘ ↓ data: {type:token,token:文本片段}\n\n ↓ 前端接收并显示适配层详解位置rag_generate_with_llamaindex_stream()函数作用将不同大模型库的格式统一为字符串场景1Ollama LLM 直接调用# 调用 Ollama 的流式 APIresponse_streamSettings.llm.stream_complete(prompt)# 适配层 - 处理 Ollama 的不同返回格式fortokeninresponse_stream:# 情况1Ollama 返回 {delta: 文本片段}ifhasattr(token,delta)andtoken.delta:yieldtoken.delta# ← 提取 delta 属性# 情况2Ollama 返回 {text: 文本片段}elifhasattr(token,text):yieldtoken.text# ← 提取 text 属性# 情况3Ollama 返回其他格式字符串或对象else:token_strstr(token)# ← 转换为字符串iftoken_str:yieldtoken_str适配逻辑✅ 检查delta属性 → 提取文本✅ 检查text属性 → 提取文本✅ 其他情况 → 转换为字符串统一输出所有格式最终都 yield 字符串场景2LlamaIndex ChatEngine# 调用 LlamaIndex ChatEngine 的流式 APIresponse_streamchat_engine.stream_chat(query)# 适配层 - 处理 LlamaIndex 的不同返回格式forresponse_chunkinresponse_stream:# 情况1LlamaIndex 返回 {delta: 文本片段}ifhasattr(response_chunk,delta)andresponse_chunk.delta:yieldresponse_chunk.delta# 情况2LlamaIndex 返回 {response: ResponseObject}elifhasattr(response_chunk,response):response_textstr(response_chunk.response)ifresponse_text:yieldresponse_text# 情况3直接是字符串或其他格式else:chunk_strstr(response_chunk)ifchunk_str:yieldchunk_str场景3LlamaIndex QueryEngine# 调用 QueryEngine 的流式 APIstreaming_responsequery_engine.query(enhanced_query)# 适配层 - 处理 QueryEngine 的不同返回格式ifhasattr(streaming_response,response_gen):# 情况1返回 StreamingResponse 对象有 response_gen 生成器fortext_chunkinstreaming_response.response_gen:iftext_chunk:yieldtext_chunkelifhasattr(streaming_response,__iter__)andnotisinstance(streaming_response,str):# 情况2直接是生成器forchunkinstreaming_response:ifhasattr(chunk,delta)andchunk.delta:yieldchunk.deltaelse:chunk_strstr(chunk)ifchunk_str:yieldchunk_strelse:# 情况3非流式返回完整响应response_textstr(streaming_response)yieldresponse_text包装层详解位置search_alarm()路由函数作用将适配层输出的字符串包装成自定义 JSON 格式并封装为 SSE 格式包装过程# 调用适配层获取字符串流answer_streamrag_generate_with_llamaindex_stream(query,results,chat_history_listcurrent_history)# 包装层 - 将字符串包装成 JSON SSE 格式fortokeninanswer_stream:# ← token 是字符串来自适配层iftokenisNone:continue# 确保是字符串类型token_strstr(token)ifnotisinstance(token,str)elsetokeniftoken_str:# 包装成自定义 JSON 格式json_data{type:token,# ← 消息类型token:token_str# ← 文本片段}# 转换为 JSON 字符串json_strjson.dumps(json_data,ensure_asciiFalse)# 包装成 SSE 格式data: {...}\n\nyieldfdata:{json_str}\n\n包装格式说明自定义 JSON 格式{type:token,// 消息类型标识token:文本片段// 实际的文本内容}SSE 格式data: {type:token,token:文本片段}\n\n设计优势适配层屏蔽不同库的格式差异统一输出字符串包装层将字符串包装为应用层统一的消息格式解耦更换大模型库时只需修改适配层扩展新增消息类型只需在包装层添加这就是适配器模式 包装器模式的组合应用。消息类型分类逻辑后端根据流程阶段和状态决定发送哪种类型的消息消息类型定义消息类型发送时机发送条件数据内容info流程关键节点搜索开始/完成、RAG开始message字符串error出现错误验证失败、配置错误、执行异常message错误信息results搜索完成搜索完成后无论是否有结果data.results结果列表tokenRAG流式生成每个文本片段生成时token文本片段answerRAG生成完成生成成功且有答案answer完整答案final流程结束所有情况成功/失败data包含所有最终数据分类逻辑详解1.info- 状态信息发送时机流程中的关键节点# 搜索开始时yieldfdata:{json.dumps({type:info,message:开始搜索知识库...},ensure_asciiFalse)}\n\n# 找到精确匹配结果时yieldfdata:{json.dumps({type:info,message:f找到{len(results)}个精确匹配结果},ensure_asciiFalse)}\n\n# 开始向量搜索时yieldfdata:{json.dumps({type:info,message:正在进行向量搜索...},ensure_asciiFalse)}\n\n# 搜索完成时yieldfdata:{json.dumps({type:info,message:f找到{len(results)}个相关结果},ensure_asciiFalse)}\n\n# 开始生成RAG回答时yieldfdata:{json.dumps({type:info,message:开始生成智能回答...},ensure_asciiFalse)}\n\n分类逻辑✅ 流程开始搜索开始✅ 搜索阶段精确匹配完成、向量搜索开始、搜索完成✅ RAG阶段开始生成回答2.error- 错误信息发送时机出现错误时# 查询为空ifnotquery:yieldfdata:{json.dumps({type:error,message:查询内容不能为空},ensure_asciiFalse)}\n\n# LLM未初始化ifSettings.llmisNone:error_msgLlamaIndex LLM 未初始化无法生成回答。请检查 Ollama 服务是否运行。yieldfdata:{json.dumps({type:error,message:error_msg},ensure_asciiFalse)}\n\n# 流式生成失败ifnotfull_answer:error_msg流式生成失败未收到有效回答yieldfdata:{json.dumps({type:error,message:error_msg},ensure_asciiFalse)}\n\n# 异常捕获exceptExceptionase:error_msgf流式生成过程出错:{str(e)}yieldfdata:{json.dumps({type:error,message:error_msg},ensure_asciiFalse)}\n\n分类逻辑✅ 输入验证错误查询为空✅ 配置错误LLM未初始化、RAG未启用✅ 执行错误生成失败、异常3.results- 搜索结果发送时机搜索完成后# 搜索完成后发送结果yieldfdata:{json.dumps({type:results,data:{results:results,# 搜索结果列表包含 images 字段count:len(results),# 结果数量search_mode:search_mode# 搜索模式}},ensure_asciiFalse)}\n\n分类逻辑✅ 条件搜索完成无论是否有结果✅ 数据结果列表、数量、搜索模式✅图片数据每个结果对象的images字段包含图片路径数组4.token- 答案片段流式生成发送时机RAG流式生成过程中# 流式生成答案片段fortokeninanswer_stream:iftokenisNone:continuetoken_strstr(token)ifnotisinstance(token,str)elsetokeniftoken_str:full_answertoken_str# 每个文本片段都发送一次 token 消息yieldfdata:{json.dumps({type:token,token:token_str},ensure_asciiFalse)}\n\n分类逻辑✅ 条件启用RAG且流式生成中✅ 频率每个文本片段发送一次✅ 数据单个文本片段5.answer- 完整答案发送时机RAG生成完成后# 生成完成后发送完整答案iffull_answer:yieldfdata:{json.dumps({type:answer,answer:full_answer,# 完整答案references:results[:5]ifresultselse[]# 参考来源包含 images},ensure_asciiFalse)}\n\n分类逻辑✅ 条件RAG生成成功且有答案✅ 数据完整答案、参考来源✅图片数据references中每个对象的images字段6.final- 最终数据发送时机整个流程结束时# RAG成功时有答案yieldfdata:{json.dumps({type:final,data:{success:True,answer:full_answer,results:results,# 所有结果包含 imagesreferences:results[:5],# 参考来源包含 imagessession_id:session_id}},ensure_asciiFalse)}\n\n# 不使用RAG时只有搜索结果yieldfdata:{json.dumps({type:final,data:{success:True,results:results,# 所有结果包含 imagessession_id:session_id}},ensure_asciiFalse)}\n\n分类逻辑✅ 条件流程结束成功或失败✅ 数据包含所有最终数据答案、结果、错误、会话ID等✅图片数据results和references中都包含images字段完整流程示例场景1传统搜索模式不使用RAG1. info: 开始搜索知识库... ↓ 2. info: 找到 5 个相关结果 或 正在进行向量搜索... ↓ 3. results: { results: [ {id: 1, images: [img1.jpg], ...}, {id: 2, images: [img2.jpg], ...}, ... ], count: 5 } ↓ 4. final: { success: true, results: [{images: [...]}, ...], session_id: ... }图片返回时机✅results消息所有搜索结果的图片✅final消息所有搜索结果的图片再次返回场景2RAG模式成功1. info: 开始搜索知识库... ↓ 2. info: 找到 5 个相关结果 ↓ 3. results: { results: [{images: [...]}, ...], count: 5 } ↓ 4. info: 开始生成智能回答... ↓ 5. token: 根据 ↓ 6. token: 知识库 ↓ 7. token: 中的信息 ... (多个token逐字显示) ↓ 8. answer: { answer: 完整答案..., references: [{images: [...]}, ...] // 前5条结果的图片 } ↓ 9. final: { success: true, answer: 完整答案..., results: [{images: [...]}, ...], // 所有结果的图片 references: [{images: [...]}, ...], // 参考来源的图片 session_id: ... }图片返回时机✅results消息所有搜索结果的图片第一次✅answer消息前5条参考结果的图片第二次✅final消息所有结果和参考来源的图片第三次和第四次场景3RAG模式失败1. info: 开始搜索知识库... ↓ 2. results: {results: [{images: [...]}, ...], count: 5} ↓ 3. info: 开始生成智能回答... ↓ 4. error: LlamaIndex LLM 未初始化... ↓ 5. final: { success: true, results: [{images: [...]}, ...], rag_error: ..., session_id: ... }图片数据的返回时机重要发现图片不是单独返回的图片数据不是单独返回而是作为搜索结果的一部分每个结果对象都有一个images字段数组。图片返回的多个时机返回时机消息类型包含字段说明搜索完成后resultsdata.results[].images所有搜索结果的图片RAG生成完成后answerreferences[].images前5条参考结果的图片流程结束时finaldata.results[].imagesdata.references[].images所有结果和参考来源的图片图片数据结构{id:1,alarm_code:E001,alarm_message:温度传感器故障,solution:检查传感器连接...,images:[uploads/images/sensor1.jpg,uploads/images/sensor2.jpg],...}总结与最佳实践核心技术要点三个层面分离传输协议层SSE标准统一数据格式层JSON项目自定义大模型API层模型特定适配器模式适配层统一不同库的格式差异包装层统一应用层的消息格式消息类型设计按流程阶段分类实时反馈 最终数据错误处理完善实现建议统一定义消息类型# 使用常量定义classMessageType:INFOinfoERRORerrorRESULTSresultsTOKENtokenANSWERanswerFINALfinal错误处理所有错误都发送error消息最终都发送final消息包含错误信息性能优化快速搜索直接返回完整结果RAG模式流式生成实时显示用户体验实时状态反馈info逐字显示答案token完整数据保证final设计模式应用适配器模式适配不同大模型库的格式包装器模式统一应用层的消息格式观察者模式前端监听流式消息并更新UI结语流式传输是一个涉及多个层面的复杂技术需要✅理解三个层面传输协议、数据格式、大模型API✅实现适配层统一不同库的格式差异✅实现包装层统一应用层的消息格式✅设计消息类型按流程阶段分类✅前后端配合双方都必须实现流式处理通过这种设计我们实现了实时反馈用户可以看到搜索和生成进度✍️逐字显示答案逐字显示类似 ChatGPT向后兼容传统搜索模式不受影响️错误处理完善的错误处理和提示希望这篇文章能帮助你理解流式传输的完整实现过程