单页网站制作视频教程整屏网站模板
单页网站制作视频教程,整屏网站模板,三亚河北建设招聘信息网站,wordpress 修改固定链接 404一、RAG到底在解决什么问题在动手之前#xff0c;我想先聊聊RAG这个概念#xff0c;因为很多刚接触的朋友容易搞混。大模型很强#xff0c;但它有两个致命弱点#xff1a;
第一#xff0c;知识有截止日期。 GPT-4的训练数据截止到某个时间点#xff0c;它不知道你们公司上…一、RAG到底在解决什么问题在动手之前我想先聊聊RAG这个概念因为很多刚接触的朋友容易搞混。大模型很强但它有两个致命弱点第一知识有截止日期。GPT-4的训练数据截止到某个时间点它不知道你们公司上周发布的新规范也不知道你们昨天刚修复的那个bug是怎么解决的。第二会一本正经地胡说八道。当大模型遇到它不知道的问题时它不会老老实实说我不知道而是会基于它学过的通用知识给你编一个看起来很合理但其实是错的答案。这就是所谓的幻觉Hallucination。RAGRetrieval-Augmented Generation检索增强生成的核心思路其实很简单别让大模型靠想象力答题先帮它把参考资料找出来让它照着资料回答。具体来说分三步把你的私有文档切成小块转成向量存起来用户提问时先根据问题检索出最相关的文档片段把问题和检索到的内容一起喂给大模型让它基于这些材料生成答案听起来不复杂对吧我当时也是这么想的然后就踩了一堆坑。二、第一个大坑文档切分没那么简单我最初的方案特别粗暴——用LangChain的RecursiveCharacterTextSplitter设置chunk_size500overlap50直接把所有文档切成小块。代码写起来确实很简单from langchain.text_splitter import RecursiveCharacterTextSplitterdefnaive_split(text): 最初的简单切分方案——后来证明这是个坑 splitter RecursiveCharacterTextSplitter( chunk_size500, chunk_overlap50, separators[\n\n, \n, 。, , , , ] ) chunks splitter.split_text(text)return chunks# 测试一下sample_text # MySQL主从切换操作手册## 1. 前置检查在执行主从切换之前必须完成以下检查- 确认从库同步状态正常Seconds_Behind_Master 0- 确认没有正在执行的大事务- 通知相关业务方确认切换时间窗口## 2. 切换步骤2.1 在主库执行只读设置SET GLOBAL read_only 1;2.2 等待从库完全同步在从库执行 SHOW SLAVE STATUS确认 Seconds_Behind_Master 02.3 停止从库复制STOP SLAVE;RESET SLAVE ALL;## 3. 回滚方案如果切换失败按以下步骤回滚...chunks naive_split(sample_text)for i, chunk inenumerate(chunks):print(f Chunk {i1} )print(chunk[:100] ... iflen(chunk) 100else chunk)看起来没毛病是吧但实际用起来问题大了。有一次用户问MySQL切换前需要做哪些检查系统返回的文档片段是这样的确认没有正在执行的大事务- 通知相关业务方确认切换时间窗口## 2. 切换步骤2.1 在主库执行只读设置SET GLOBAL read_only 1;发现问题了吗这个片段恰好从检查步骤的中间切开了第一条检查项确认从库同步状态正常被切到了上一个chunk里。用户问的是需要做哪些检查结果我们给大模型的参考资料里第一条检查项就没包含进去。核心教训机械地按字数切分会打断文档的语义完整性。后来我改成了基于语义结构的切分策略import refrom typing importList, DictclassSmartDocumentSplitter: 语义感知的文档切分器 核心思路尊重文档的原有结构按标题、段落等语义边界切分def__init__(self, max_chunk_size800, min_chunk_size100): self.max_chunk_size max_chunk_size self.min_chunk_size min_chunk_sizedefsplit_markdown(self, text: str) - List[Dict]: 针对Markdown文档的切分 保持标题层级结构每个chunk都带上完整的上下文路径 chunks [] current_headers {1: , 2: , 3: } # 记录当前的标题层级# 按行处理识别标题和内容 lines text.split(\n) current_content []for line in lines:# 检测Markdown标题 header_match re.match(r^(#{1,3})\s(.)$, line)if header_match:# 遇到新标题先保存之前的内容if current_content: chunk_text \n.join(current_content).strip()iflen(chunk_text) self.min_chunk_size: chunks.append({content: chunk_text,headers: dict(current_headers),context_path: self._build_context_path(current_headers) }) current_content []# 更新标题层级 level len(header_match.group(1)) title header_match.group(2) current_headers[level] title# 清除下级标题for l inrange(level 1, 4): current_headers[l] current_content.append(line)else: current_content.append(line)# 如果当前内容超过最大长度强制切分但尽量在段落边界 content_so_far \n.join(current_content)iflen(content_so_far) self.max_chunk_size: chunk_text content_so_far.strip() chunks.append({content: chunk_text,headers: dict(current_headers),context_path: self._build_context_path(current_headers) }) current_content []# 别忘了最后一段if current_content: chunk_text \n.join(current_content).strip()iflen(chunk_text) self.min_chunk_size: chunks.append({content: chunk_text,headers: dict(current_headers),context_path: self._build_context_path(current_headers) })return chunksdef_build_context_path(self, headers: Dict) - str: 构建层级路径比如MySQL主从切换 前置检查 path_parts [h for h in [headers[1], headers[2], headers[3]] if h]return .join(path_parts) if path_parts else未分类defenrich_chunk_with_context(self, chunk: Dict) - str: 关键技巧给每个chunk加上上下文前缀 这样即使单独看这个片段也能知道它属于哪个章节 context f[文档路径{chunk[context_path]}]\n\nreturn context chunk[content]# 实际使用示例splitter SmartDocumentSplitter(max_chunk_size800)chunks splitter.split_markdown(sample_text)print(f切分后共 {len(chunks)} 个片段\n)for i, chunk inenumerate(chunks):print(f Chunk {i1} )print(f路径{chunk[context_path]})print(f内容预览{chunk[content][:150]}...)print()这样切出来的效果就好多了。每个chunk开头都会带上它的位置信息大模型在回答时能更准确地理解这段内容的上下文。不过说实话这个方案也不是万能的。对于那些格式不规范的老文档没有清晰的标题结构切分效果依然一般。后来我又针对不同类型的文档做了差异化处理这个我们后面再说。三、第二个大坑向量检索的语义鸿沟解决了切分问题下一步就是向量化和检索了。我用的是开源的BGE模型做Embedding用Milvus做向量数据库。第一版的检索代码很直白from sentence_transformers import SentenceTransformerfrom pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utilityimport numpy as npclassVectorStore: 向量存储和检索def__init__(self, model_nameBAAI/bge-base-zh-v1.5):# 加载Embedding模型 self.model SentenceTransformer(model_name) self.dim 768# BGE base模型的向量维度# 连接Milvus connections.connect(default, hostlocalhost, port19530)defcreate_collection(self, collection_name: str): 创建集合if utility.has_collection(collection_name): utility.drop_collection(collection_name) fields [ FieldSchema(nameid, dtypeDataType.INT64, is_primaryTrue, auto_idTrue), FieldSchema(namecontent, dtypeDataType.VARCHAR, max_length4096), FieldSchema(namecontext_path, dtypeDataType.VARCHAR, max_length512), FieldSchema(nameembedding, dtypeDataType.FLOAT_VECTOR, dimself.dim) ] schema CollectionSchema(fields, description知识库文档) collection Collection(collection_name, schema)# 创建索引 index_params { metric_type: COSINE, index_type: IVF_FLAT, params: {nlist: 128} } collection.create_index(embedding, index_params)return collectiondefinsert_documents(self, collection_name: str, chunks: list): 插入文档 collection Collection(collection_name) contents [chunk[content] for chunk in chunks] context_paths [chunk[context_path] for chunk in chunks]# 批量生成Embedding embeddings self.model.encode(contents, normalize_embeddingsTrue) collection.insert([contents, context_paths, embeddings.tolist()]) collection.flush()print(f成功插入 {len(chunks)} 条文档)defsearch(self, collection_name: str, query: str, top_k: int 5): 基础检索 collection Collection(collection_name) collection.load()# 生成查询向量 query_embedding self.model.encode([query], normalize_embeddingsTrue) results collection.search( dataquery_embedding.tolist(), anns_fieldembedding, param{metric_type: COSINE, params: {nprobe: 16}}, limittop_k, output_fields[content, context_path] )return results[0]基本功能是没问题的。但实际跑起来我发现了一个让人抓狂的现象——用户的口语化提问和文档的正式表述之间存在巨大的语义鸿沟。举个例子用户问数据库挂了怎么办文档标题是MySQL服务异常恢复操作手册这两个在语义上是相关的但向量相似度可能并不高。因为用户说的挂了和文档里的异常用词差异很大。更坑的是有时候检索出的Top 5结果里真正相关的那篇可能只排在第3或第4位但前两名是一些看起来相关但实际上文不对题的内容。如果我只取Top 3喂给大模型可能就漏掉了最关键的信息。后来我采用了一个两阶段检索的策略先用向量检索做粗筛再用重排序模型做精排。from transformers import AutoModelForSequenceClassification, AutoTokenizerimport torchclassEnhancedRetriever: 增强版检索器向量检索 重排序def__init__(self, vector_store: VectorStore): self.vector_store vector_store# 加载重排序模型BGE Reranker效果不错 self.reranker_tokenizer AutoTokenizer.from_pretrained(BAAI/bge-reranker-base) self.reranker_model AutoModelForSequenceClassification.from_pretrained(BAAI/bge-reranker-base) self.reranker_model.eval()defretrieve_with_rerank(self, collection_name: str, query: str, initial_top_k: int 20, final_top_k: int 5): 两阶段检索1. 向量检索召回 initial_top_k 个候选2. 用重排序模型精排返回 final_top_k 个结果# 第一阶段向量检索召回更多候选 initial_results self.vector_store.search(collection_name, query, top_kinitial_top_k)ifnot initial_results:return []# 准备重排序 candidates []for hit in initial_results: candidates.append({content: hit.entity.get(content),context_path: hit.entity.get(context_path),vector_score: hit.score # 保留向量检索得分用于调试 })# 第二阶段重排序 rerank_scores self._compute_rerank_scores(query, [c[content] for c in candidates])for i, score inenumerate(rerank_scores): candidates[i][rerank_score] score# 按重排序分数排序 candidates.sort(keylambda x: x[rerank_score], reverseTrue)return candidates[:final_top_k]def_compute_rerank_scores(self, query: str, documents: list) - list: 计算query和每个文档的相关性分数 scores []with torch.no_grad():for doc in documents:# Reranker的输入格式是 [query, document] inputs self.reranker_tokenizer( [[query, doc]], paddingTrue, truncationTrue, max_length512, return_tensorspt ) outputs self.reranker_model(**inputs) score outputs.logits.squeeze().item() scores.append(score)return scoresdefretrieve_with_query_expansion(self, collection_name: str, query: str, llm_client, top_k: int 5): 进阶技巧查询扩展 用大模型改写用户问题生成多个变体再合并检索结果# 让大模型帮我们扩展查询 expansion_prompt f请将下面这个问题改写成3个不同的表达方式保持意思相同但用词不同。每行输出一个改写结果不要序号不要其他解释。原问题{query} expanded_queries llm_client.generate(expansion_prompt).strip().split(\n) expanded_queries [q.strip() for q in expanded_queries if q.strip()]# 加上原始查询 all_queries [query] expanded_queries[:3] # 最多取3个扩展查询print(f扩展后的查询{all_queries}) # 调试用# 对每个查询分别检索 all_candidates {}for q in all_queries: results self.vector_store.search(collection_name, q, top_k10)for hit in results: content hit.entity.get(content)if content notin all_candidates: all_candidates[content] {content: content,context_path: hit.entity.get(context_path),best_score: hit.score,hit_count: 1 }else:# 被多个查询命中的文档增加权重 all_candidates[content][hit_count] 1 all_candidates[content][best_score] max( all_candidates[content][best_score], hit.score )# 综合评分命中次数 * 最高得分 candidates list(all_candidates.values())for c in candidates: c[combined_score] c[hit_count] * c[best_score] candidates.sort(keylambda x: x[combined_score], reverseTrue)return candidates[:top_k]查询扩展这招特别好用。比如用户问数据库挂了怎么办大模型可能会扩展成MySQL服务故障如何处理数据库无法连接的解决方案数据库宕机恢复步骤这几个查询一起检索能覆盖更多的相关文档。四、第三个大坑Prompt工程的门道比想象中深检索的问题解决了接下来就是把检索到的内容和用户问题一起喂给大模型了。这一步我本以为最简单没想到也踩了不少坑。最初的Prompt特别朴素defbuild_naive_prompt(query: str, context_docs: list) - str: 最初的简单Prompt——后来证明太天真了 context \n\n.join([doc[content] for doc in context_docs]) prompt f根据以下参考资料回答用户问题。参考资料{context}用户问题{query}请回答return prompt这个Prompt有几个严重问题问题一大模型不知道什么时候该说不知道。当参考资料里确实没有答案时它还是会编一个出来。问题二没有引导大模型说明信息来源。用户看到答案不知道是从哪篇文档里来的无法追溯和验证。问题三对于复杂问题回答的结构不够清晰。后来迭代了很多版最终稳定下来的Prompt是这样的defbuild_rag_prompt(query: str, context_docs: list, include_sources: bool True) - str: 生产环境使用的Prompt模板 关键设计明确角色定位、限制回答范围、要求标注来源# 格式化上下文每段都标注来源 context_parts []for i, doc inenumerate(context_docs, 1): source doc.get(context_path, 未知来源) context_parts.append(f【资料{i}来源{source}】\n{doc[content]}) context \n\n\n\n.join(context_parts) prompt f你是一个企业内部知识库助手专门帮助员工查找和理解公司内部文档。## 你的工作准则1. **只根据提供的参考资料回答问题**不要使用你自己的知识。2. 如果参考资料中没有相关信息请明确说根据现有资料我无法找到关于这个问题的信息并建议用户联系相关部门或换个关键词搜索。3. 回答时请标注信息来源格式如【资料1】方便用户追溯原文。4. 对于操作类问题请按步骤清晰地列出对于概念类问题先给出简明定义再展开解释。5. 如果不同资料中的信息有冲突请指出差异并说明各自的适用场景。## 参考资料{context}## 用户问题{query}## 回答要求请根据上述参考资料回答用户问题。记住- 只使用参考资料中的信息- 标注信息来源- 没有把握的内容不要编造return promptdefbuild_conversational_prompt(query: str, context_docs: list, chat_history: list None) - str: 支持多轮对话的Prompt 需要带上历史对话记录让大模型理解上下文 context_parts []for i, doc inenumerate(context_docs, 1): source doc.get(context_path, 未知来源) context_parts.append(f【资料{i}来源{source}】\n{doc[content]}) context \n\n\n\n.join(context_parts)# 格式化历史对话 history_text if chat_history: history_parts []for turn in chat_history[-5:]: # 只保留最近5轮避免太长 history_parts.append(f用户{turn[user]}) history_parts.append(f助手{turn[assistant]}) history_text \n.join(history_parts) prompt f你是一个企业内部知识库助手。## 参考资料{context}## 对话历史{history_text if history_text else 这是对话的开始}## 当前问题用户{query}## 回答准则1. 优先根据参考资料回答如无相关信息请明确说明2. 考虑对话历史的上下文如用户说它可能指代之前提到的概念3. 标注信息来源助手return prompt关于Prompt我还想分享一个很重要的经验不要试图在一个Prompt里塞太多指令。一开始我把各种要求都写进去回答要准确、要简洁、要友好、要专业、要标注来源、要分步骤、遇到不确定要说不知道……结果发现模型反而被绕晕了有时候顾了这个忘了那个。后来我的做法是区分核心指令和优化指令核心指令必须保留优化指令可以根据问题类型动态调整。classPromptBuilder: Prompt构建器根据问题类型动态调整# 核心指令——任何情况都必须包含 CORE_INSTRUCTIONS 1. 只使用参考资料中的信息回答不要编造2. 资料中没有的信息明确说无法找到相关信息3. 标注信息来源【资料X】# 操作类问题的额外指令 PROCEDURE_INSTRUCTIONS 回答格式要求- 按步骤编号列出第一步、第二步...- 每个步骤要明确操作对象和操作动作- 重要的警告或注意事项用⚠️标出# 概念解释类问题的额外指令 CONCEPT_INSTRUCTIONS 回答格式要求- 先用一句话给出核心定义- 再详细解释关键点- 如有必要举例说明# 故障排查类问题的额外指令 TROUBLESHOOT_INSTRUCTIONS 回答格式要求- 先列出可能的原因- 针对每个原因给出排查方法- 给出解决方案或规避建议 classmethoddefbuild(cls, query: str, context_docs: list, question_type: str general) - str: 根据问题类型构建Prompt# 简单的问题分类逻辑实际项目中可以用分类模型if question_type auto: question_type cls._classify_question(query) extra_instructions if question_type procedure: extra_instructions cls.PROCEDURE_INSTRUCTIONSelif question_type concept: extra_instructions cls.CONCEPT_INSTRUCTIONSelif question_type troubleshoot: extra_instructions cls.TROUBLESHOOT_INSTRUCTIONS context cls._format_context(context_docs) prompt f你是企业内部知识库助手。## 必须遵守的规则{cls.CORE_INSTRUCTIONS}{f## 回答格式{extra_instructions} if extra_instructions else }## 参考资料{context}## 用户问题{query}请回答return prompt classmethoddef_classify_question(cls, query: str) - str: 简单的问题分类基于关键词 procedure_keywords [怎么做, 如何操作, 步骤, 流程, 怎样] concept_keywords [是什么, 什么是, 定义, 解释, 区别] troubleshoot_keywords [为什么, 报错, 失败, 异常, 问题, 故障] query_lower query.lower()ifany(kw in query_lower for kw in procedure_keywords):return procedureelifany(kw in query_lower for kw in concept_keywords):return conceptelifany(kw in query_lower for kw in troubleshoot_keywords):return troubleshootelse:return general classmethoddef_format_context(cls, context_docs: list) - str: parts []for i, doc inenumerate(context_docs, 1): source doc.get(context_path, 未知来源) parts.append(f【资料{i}来源{source}】\n{doc[content]})return \n\n\n\n.join(parts)五、串起来完整的RAG Pipeline前面说了一堆细节现在把它们串成一个完整的Pipelinefrom openai import OpenAIfrom typing importList, Dict, Optionalimport jsonclassRAGPipeline: 完整的RAG处理流程 文档切分 - 向量化存储 - 检索 - 重排序 - 生成回答def__init__(self, llm_base_url: str https://api.deepseek.com, llm_api_key: str your-api-key, llm_model: str deepseek-chat):# 初始化各个组件 self.splitter SmartDocumentSplitter(max_chunk_size800) self.vector_store VectorStore() self.retriever EnhancedRetriever(self.vector_store)# 初始化LLM客户端这里用DeepSeek也可以换成其他的 self.llm_client OpenAI(base_urlllm_base_url, api_keyllm_api_key) self.llm_model llm_model self.collection_name knowledge_basedefingest_documents(self, documents: List[Dict]): 文档入库 documents格式[{title: 文档标题, content: 文档内容, source: 来源}]print(f开始处理 {len(documents)} 篇文档...) all_chunks []for doc in documents:# 在内容前加上标题帮助切分器识别结构 full_content f# {doc[title]}\n\n{doc[content]} chunks self.splitter.split_markdown(full_content)# 给每个chunk加上文档来源信息for chunk in chunks: chunk[source_doc] doc.get(source, doc[title]) all_chunks.extend(chunks)print(f切分后共 {len(all_chunks)} 个片段)# 创建集合并插入 self.vector_store.create_collection(self.collection_name) self.vector_store.insert_documents(self.collection_name, all_chunks)print(文档入库完成)defquery(self, question: str, chat_history: Optional[List[Dict]] None, top_k: int 5, use_rerank: bool True) - Dict: 处理用户查询 返回{answer: 回答内容, sources: [引用的来源], retrieved_docs: [检索到的文档]}# 1. 检索相关文档if use_rerank: retrieved_docs self.retriever.retrieve_with_rerank( self.collection_name, question, initial_top_k20, final_top_ktop_k )else: results self.vector_store.search(self.collection_name, question, top_ktop_k) retrieved_docs [{content: hit.entity.get(content),context_path: hit.entity.get(context_path),score: hit.score } for hit in results]ifnot retrieved_docs:return { answer: 抱歉我没有找到与您问题相关的资料。您可以尝试换个关键词或联系相关部门获取帮助。, sources: [], retrieved_docs: [] }# 2. 构建Promptif chat_history: prompt build_conversational_prompt(question, retrieved_docs, chat_history)else: prompt PromptBuilder.build(question, retrieved_docs, question_typeauto)# 3. 调用LLM生成回答 response self.llm_client.chat.completions.create( modelself.llm_model, messages[{role: user, content: prompt}], temperature0.3, # 知识库问答用较低的temperature max_tokens2000 ) answer response.choices[0].message.content# 4. 提取引用的来源 sources list(set([doc.get(context_path, 未知来源) for doc in retrieved_docs]))return { answer: answer, sources: sources, retrieved_docs: retrieved_docs }defevaluate_response(self, question: str, answer: str, ground_truth: str None) - Dict: 回答质量评估可选 用LLM评估回答的质量方便持续优化 eval_prompt f请评估以下问答的质量。问题{question}回答{answer}{f参考答案{ground_truth} if ground_truth else }请从以下维度评分1-5分并说明理由1. 相关性回答是否切题2. 准确性信息是否正确3. 完整性是否完整解答了问题4. 可读性表述是否清晰易懂请用JSON格式输出{{relevance: 分数, accuracy: 分数, completeness: 分数, readability: 分数, comments: 评价说明}} response self.llm_client.chat.completions.create( modelself.llm_model, messages[{role: user, content: eval_prompt}], temperature0 )try: eval_result json.loads(response.choices[0].message.content)return eval_resultexcept:return {error: 评估结果解析失败}# 使用示例if __name__ __main__:# 初始化Pipeline rag RAGPipeline( llm_base_urlhttps://api.deepseek.com, llm_api_keyyour-api-key, llm_modeldeepseek-chat )# 准备测试文档 test_documents [ { title: MySQL主从切换操作手册, content: ## 1. 前置检查 在执行主从切换之前必须完成以下检查 - 确认从库同步状态正常Seconds_Behind_Master 0 - 确认没有正在执行的大事务 - 通知相关业务方确认切换时间窗口 ## 2. 切换步骤 ### 2.1 在主库执行只读设置 SET GLOBAL read_only 1; ### 2.2 等待从库完全同步 在从库执行 SHOW SLAVE STATUS确认 Seconds_Behind_Master 0 ### 2.3 停止从库复制并提升为主库 STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only 0; ## 3. 切换后验证 - 确认新主库可以正常写入 - 确认应用连接已切换到新主库 - 监控新主库的性能指标 , source: DBA团队文档 } ]# 入库 rag.ingest_documents(test_documents)# 测试查询 result rag.query(MySQL切换前需要做哪些检查)print( * 50)print(问题MySQL切换前需要做哪些检查)print( * 50)print(f\n回答\n{result[answer]})print(f\n参考来源{result[sources]})六、上线后的一些经验教训系统上线到现在差不多两个月了期间又踩了不少坑这里挑几个印象最深的说说。教训一用户的问题千奇百怪我们在设计时假设用户会问MySQL怎么做主从切换这种正常问题。但实际上呢有人问上次那个事故怎么处理的来着——没有任何上下文系统根本不知道那个事故是哪个。有人问帮我写个SQL。——这根本不是知识库问答这是让大模型帮写代码。还有人问在吗——我也不知道他想干啥。后来我加了一个意图识别层先判断用户的问题是否属于知识库问答的范畴defclassify_intent(self, query: str) - str:识别用户意图 intent_prompt f判断用户输入的意图类别只输出类别名称- knowledge_query查询知识库信息如询问流程、规范、操作方法- code_request请求生成代码- chitchat闲聊或无明确意图- other其他用户输入{query}意图类别 response self.llm_client.chat.completions.create( modelself.llm_model, messages[{role: user, content: intent_prompt}], temperature0, max_tokens20 )return response.choices[0].message.content.strip()对于非知识库问答的意图给用户一个友好的提示而不是硬着头皮检索。教训二冷启动时的尴尬系统刚上线时知识库里的文档不多覆盖的场景有限。用户问了几个问题都答不上来体验特别差于是就不来用了。后来的解决办法上线前先梳理高频问题确保至少这些问题能回答好搞了一个问题收集功能对于答不上来的问题记录下来反馈给内容团队让他们补充相关文档做了一个兜底策略——如果检索不到高相关度的内容就展示相关推荐把一些相似度尚可的文档标题列出来引导用户自己去看教训三文档更新的同步问题知识库的文档是会更新的。老版本的操作手册废弃了新版本发布了。但如果向量数据库里还存着老版本的内容用户检索到的可能是过时信息。这个问题说起来简单做起来挺麻烦的。我们最后的方案是每个文档入库时记录版本号和更新时间定期全量重新入库我们是每周一次对于紧急更新的重要文档支持手动触发单篇重入库七、性能优化让系统不那么慢RAG系统有个让人头疼的问题——慢。整个流程跑一遍Embedding编码、向量检索、重排序、LLM生成全部加起来可能要好几秒。用户体验就很差问一个问题要等半天。几个优化措施import asynciofrom functools import lru_cacheimport hashlibclassOptimizedRAG: 性能优化版RAGdef__init__(self):# 缓存热门查询的结果 self.query_cache {} self.cache_ttl 3600# 1小时过期 lru_cache(maxsize1000)def_compute_query_embedding(self, query: str): Embedding结果缓存 同样的问题不用重复计算向量return self.model.encode([query], normalize_embeddingsTrue)[0]def_get_cache_key(self, query: str) - str: 生成缓存keyreturn hashlib.md5(query.lower().strip().encode()).hexdigest()asyncdefstream_query(self, question: str): 流式输出 不用等整个回答生成完边生成边输出 retrieved_docs await asyncio.to_thread( self.retriever.retrieve_with_rerank, self.collection_name, question, 20, 5 ) prompt PromptBuilder.build(question, retrieved_docs, question_typeauto)# 使用流式API stream self.llm_client.chat.completions.create( modelself.llm_model, messages[{role: user, content: prompt}], temperature0.3, streamTrue# 开启流式 )for chunk in stream:if chunk.choices[0].delta.content:yield chunk.choices[0].delta.content流式输出这一点特别重要。用户问完问题后马上就能看到回答在打字心理上就不会觉得那么慢了。八、回顾与思考把这套系统从被骂下线到成为部门标配前后折腾了将近一个月。趟过的坑挺多但收获也很大。几点核心总结1. RAG不是万能的选好适用场景RAG适合有明确知识库、答案可追溯的场景。如果你的需求是让大模型发挥创造力比如写文案、做创意那RAG反而是个约束。2. 切分和检索是根基大家往往把注意力放在大模型本身觉得用更强的模型就能解决问题。但实际上如果前面的切分和检索做得不好再强的模型也是巧妇难为无米之炊。3. Prompt工程真的是门手艺同样的检索结果不同的Prompt可能带来天壤之别的回答效果。这个没什么捷径就是多试、多看、多迭代。4. 上线只是开始真正的挑战在上线之后。用户的各种奇葩输入、文档的持续更新、性能的优化、效果的监控……每一项都是持续的工作。最后附上这套系统目前的一些核心指标日均查询量200次平均响应时间2.3秒开启流式后首字符延迟约0.8秒用户满意度通过回答后的点赞/点踩收集约72%无法回答的比例约22%这部分会定期分析推动补充文档普通人如何抓住AI大模型的风口领取方式在文末为什么要学习大模型目前AI大模型的技术岗位与能力培养随着人工智能技术的迅速发展和应用 大模型作为其中的重要组成部分 正逐渐成为推动人工智能发展的重要引擎 。大模型以其强大的数据处理和模式识别能力 广泛应用于自然语言处理 、计算机视觉 、 智能推荐等领域 为各行各业带来了革命性的改变和机遇 。目前开源人工智能大模型已应用于医疗、政务、法律、汽车、娱乐、金融、互联网、教育、制造业、企业服务等多个场景其中应用于金融、企业服务、制造业和法律领域的大模型在本次调研中占比超过30%。随着AI大模型技术的迅速发展相关岗位的需求也日益增加。大模型产业链催生了一批高薪新职业人工智能大潮已来不加入就可能被淘汰。如果你是技术人尤其是互联网从业者现在就开始学习AI大模型技术真的是给你的人生一个重要建议最后只要你真心想学习AI大模型技术这份精心整理的学习资料我愿意无偿分享给你但是想学技术去乱搞的人别来找我在当前这个人工智能高速发展的时代AI大模型正在深刻改变各行各业。我国对高水平AI人才的需求也日益增长真正懂技术、能落地的人才依旧紧缺。我也希望通过这份资料能够帮助更多有志于AI领域的朋友入门并深入学习。真诚无偿分享vx扫描下方二维码即可加上后会一个个给大家发【附赠一节免费的直播讲座技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等欢迎大家~】大模型全套学习资料展示自我们与MoPaaS魔泊云合作以来我们不断打磨课程体系与技术内容在细节上精益求精同时在技术层面也新增了许多前沿且实用的内容力求为大家带来更系统、更实战、更落地的大模型学习体验。希望这份系统、实用的大模型学习路径能够帮助你从零入门进阶到实战真正掌握AI时代的核心技能01教学内容从零到精通完整闭环【基础理论 →RAG开发 → Agent设计 → 模型微调与私有化部署调→热门技术】5大模块内容比传统教材更贴近企业实战大量真实项目案例带你亲自上手搞数据清洗、模型调优这些硬核操作把课本知识变成真本事02适学人群应届毕业生无工作经验但想要系统学习AI大模型技术期待通过实战项目掌握核心技术。零基础转型非技术背景但关注AI应用场景计划通过低代码工具实现“AI行业”跨界。业务赋能突破瓶颈传统开发者Java/前端等学习Transformer架构与LangChain框架向AI全栈工程师转型。vx扫描下方二维码即可【附赠一节免费的直播讲座技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等欢迎大家~】本教程比较珍贵仅限大家自行学习不要传播更严禁商用03入门到进阶学习路线图大模型学习路线图整体分为5个大的阶段04视频和书籍PDF合集从0到掌握主流大模型技术视频教程涵盖模型训练、微调、RAG、LangChain、Agent开发等实战方向新手必备的大模型学习PDF书单来了全是硬核知识帮你少走弯路不吹牛真有用05行业报告白皮书合集收集70报告与白皮书了解行业最新动态0690份面试题/经验AI大模型岗位面试经验总结谁学技术不是为了赚$呢找个好的岗位很重要07 deepseek部署包技巧大全由于篇幅有限只展示部分资料并且还在持续更新中…真诚无偿分享vx扫描下方二维码即可加上后会一个个给大家发【附赠一节免费的直播讲座技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等欢迎大家~】