官方网站建设流程群晖 搭建两个wordpress
官方网站建设流程,群晖 搭建两个wordpress,做网站好还是网店,贵州省建设厅官网网站最近在做一个AI客服系统的升级项目#xff0c;客户反馈说#xff0c;面对一些步骤多、描述模糊的复杂问题#xff0c;比如“我想订一张下周五从北京飞上海#xff0c;中间能在南京停留两天的机票#xff0c;并且要用我的积分抵扣”#xff0c;传统的规则引擎或者简单的意…最近在做一个AI客服系统的升级项目客户反馈说面对一些步骤多、描述模糊的复杂问题比如“我想订一张下周五从北京飞上海中间能在南京停留两天的机票并且要用我的积分抵扣”传统的规则引擎或者简单的意图识别模型就有点力不从心了。要么是让用户反复确认体验很差要么就直接转人工成本很高。我们团队琢磨了很久最后决定尝试将大语言模型LLM和分层任务网络HTN结合起来搞一个智能任务分解的架构。跑了一段时间效果挺惊喜的复杂工单的处理效率提升了差不多3倍。今天就来跟大家聊聊我们是怎么做的踩了哪些坑以及一些性能调优的心得。1. 背景与痛点为什么传统方案会“卡壳”在深入技术细节前我们先看看老系统到底卡在哪了。传统的客服机器人或者一些初代的AI客服通常有两种路子基于规则/流程树把常见的用户问题预先画成一颗巨大的“决策树”。用户每说一句话系统就沿着树枝往下走一步。这种方法对于“查询余额”、“修改密码”这种有固定套路的问题还行但一旦用户的问题稍微复杂点或者描述顺序和预设的不一样系统就懵了很容易掉进“抱歉我不理解”的死循环。基于简单意图分类用一个分类模型把用户的一句话映射到几十个预设的意图标签上比如“订机票”、“退票”、“改签”。然后每个意图背后对应一个固定的处理函数。问题在于像开头的例子它虽然核心是“订机票”但里面嵌套了“多段行程”、“使用积分”等多个子任务。简单分类只能触发一个主流程无法自动拆解出里面的子步骤导致后续对话异常冗长。核心痛点就两个一是“多轮对话状态维护难”用户可能在对话中随时插入新条件或修改旧条件系统需要精准地记住并更新上下文而不是每次都从头开始。二是“模糊或复合意图识别难”用户一句话里可能包含多个请求或者描述得非常口语化、不完整。2. 技术选型为什么是LLMHTN而不是纯LLM一开始我们想过直接用LLM“包打天下”让LLM根据对话历史直接输出下一步该执行什么动作甚至生成调用API的参数。但实测下来发现几个问题响应速度慢尤其是对于长上下文每次都需要把整个历史对话喂给LLM推理耗时很长成本也高。输出不稳定LLM的生成结果有一定随机性可能这次输出正确的JSON格式下次就多了一段废话导致后端解析失败。可解释性差系统为什么做出某个决策是依据哪条用户输入这在需要严格审核的客服场景下是个隐患。难以保证业务流程LLM可能“创造性”地跳过一些必要的合规步骤比如验证身份。而分层任务网络HTN是一种经典的人工智能规划方法它的核心思想是“任务分解”。它有一个“方法库”里面定义了如何将一个高层任务如“订机票”分解为一系列低层任务如“选择出发地”、“选择目的地”、“选择日期”……直到分解为可以直接执行的“原子动作”。LLM HTN的混合方案正好取长补短LLM作为“理解官”负责最难的部分——理解用户的自然语言从中抽取出结构化的“任务请求”和“参数槽位”。它把非结构化的语言转化成HTN能理解的“高层任务描述”。HTN作为“规划官”接收LLM输出的结构化任务根据预定义的方法库严格按照业务逻辑进行分解和排序生成一个可执行的任务流通常是一个有向无环图DAG。这个过程是确定性的、可解释的。简单说LLM负责“听懂人话”HTN负责“按规矩办事”。这样既拥有了LLM强大的语言理解能力又保证了业务流程的确定性、高效性和可解释性。3. 核心实现从框架到代码我们的系统整体基于FastAPI构建轻量且异步支持好。3.1 服务框架与整体流程from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, Any, List import asyncio from .llm_intent import IntentRecognizer from .htn_planner import HTNPlanner from .task_executor import TaskExecutor app FastAPI(titleAI客服智能任务分解系统) class UserQuery(BaseModel): session_id: str utterance: str history: List[Dict[str, Any]] [] # 对话历史 class SystemResponse(BaseModel): session_id: str reply: str current_tasks: List[str] need_human: bool False # 初始化核心组件 intent_recognizer IntentRecognizer() htn_planner HTNPlanner() task_executor TaskExecutor() app.post(/chat, response_modelSystemResponse) async def handle_query(query: UserQuery): 处理用户查询的核心入口 try: # 1. LLM意图识别与槽位填充 structured_task await intent_recognizer.recognize(query.utterance, query.history) # 2. HTN任务分解与规划 task_dag htn_planner.plan(structured_task, query.session_id) # 3. 异步执行任务流 execution_results await task_executor.execute_dag(task_dag, query.session_id) # 4. 生成自然语言回复 reply_text await task_executor.generate_reply(execution_results, query.history) return SystemResponse( session_idquery.session_id, replyreply_text, current_tasks[task.name for task in task_dag.get_ready_tasks()], need_humanexecution_results.get(need_human_assist, False) ) except Exception as e: # 异常处理与降级策略 return await handle_fallback(query, e)3.2 LLM意图识别模块这里的关键是设计好的Prompt引导LLM输出稳定的结构化数据。我们用的是函数调用或工具调用的方式但核心思路一致。import openai from typing import Optional, List from pydantic import BaseModel, Field # 定义LLM需要输出的结构化格式 class ExtractedIntent(BaseModel): LLM提取的意图与参数 primary_task: str Field(description核心任务名称如 book_flight, change_ticket) sub_tasks: List[str] Field(default_factorylist, description识别出的子任务列表) parameters: Dict[str, Any] Field(default_factorydict, description提取到的参数槽位如 {\departure\: \北京\, \date\: \2023-10-27\}) ambiguity: bool Field(defaultFalse, description是否存在歧义需要澄清) clarification_question: Optional[str] Field(defaultNone, description如果需要澄清准备问什么) class IntentRecognizer: def __init__(self, model: str gpt-3.5-turbo): self.client openai.OpenAI() self.model model # 系统Prompt定义了角色和输出格式要求 self.system_prompt 你是一个专业的客服对话分析助手。你的任务是从用户当前话语和对话历史中精确提取出用户想要执行的任务Task和相关的参数Slots。 输出要求 1. 确定一个最主要的primary_task。 2. 如果用户一句话中包含了多个请求在sub_tasks中列出。 3. 从话语中提取所有明确的参数填入parameters字典。如果参数不完整或模糊将ambiguity设为True并在clarification_question中生成一个简洁的澄清问题。 4. 参考对话历史历史中已经确认过的参数如果当前话语没有改变则保留。 可识别的任务类型包括book_flight, cancel_booking, inquire_ticket, use_points, multi_city_booking等。 参数槽位包括departure_city, arrival_city, departure_date, return_date, passenger_count, flight_class等。 async def recognize(self, utterance: str, history: List[Dict]) - ExtractedIntent: 识别用户意图并填充槽位 messages [ {role: system, content: self.system_prompt}, *self._format_history(history), # 将历史对话格式化成LLM消息 {role: user, content: utterance} ] try: # 使用OpenAI的JSON响应模式确保输出格式稳定 response self.client.chat.completions.create( modelself.model, messagesmessages, response_format{type: json_object}, # 关键强制返回JSON temperature0.1, # 低随机性保证输出稳定 ) result json.loads(response.choices[0].message.content) # 将JSON解析为我们定义的Pydantic模型进行验证 return ExtractedIntent(**result) except (json.JSONDecodeError, ValidationError) as e: # 如果LLM输出格式错误触发降级流程 return await self._fallback_recognition(utterance) def _format_history(self, history: List[Dict]) - List[Dict]: 将对话历史格式化为LLM消息列表 formatted [] for turn in history[-5:]: # 只保留最近5轮对话防止token超限 role assistant if turn[speaker] bot else user formatted.append({role: role, content: turn[text]}) return formattedPrompt Engineering技巧角色定义明确告诉LLM“你是什么”使其行为更聚焦。输出格式锁定使用response_format{type: json_object}或类似功能强制JSON输出极大提升稳定性。历史压缩只保留最近几轮对话并在Prompt中说明“参考历史”既提供了上下文又控制了token数量。低温度Temperature设置为0.1或0.2减少随机性让输出更可控。提供范例在系统Prompt中列出任务和参数示例进行少样本学习Few-shot Learning。3.3 HTN任务分解器与DAG构建这是系统的“大脑”。我们预定义了一个方法库Method Library里面是各种任务分解规则。from typing import Dict, List, Set, Optional from enum import Enum import networkx as nx class TaskStatus(Enum): PENDING pending READY ready RUNNING running SUCCESS success FAILED failed class TaskNode: 任务图中的节点 def __init__(self, task_id: str, name: str, action: Optional[str] None, params: Dict None): self.id task_id self.name name # 任务名称如 “select_departure_city” self.action action # 对应的执行函数名原子任务才有 self.params params or {} self.status TaskStatus.PENDING self.preconditions: Set[str] set() # 前置任务ID集合 self.children: Set[str] set() # 后续任务ID集合 class HTNPlanner: def __init__(self): # 初始化方法库键为高层任务名值为分解方法函数或规则列表 self.methods self._load_methods() # 原子任务集合这些任务不可再分直接对应可执行动作 self.primitive_tasks {fetch_flight_info, validate_identity, deduct_points, call_payment_api} def _load_methods(self) - Dict[str, List]: 加载预定义的任务分解方法 # 这里可以用配置文件、数据库或代码直接定义 return { book_flight: [ # 方法1标准单程/往返订票流程 {precond: {trip_type: [one_way, round_trip]}, steps: [select_departure_city, select_arrival_city, select_dates, search_flights, choose_flight, fill_passenger_info, make_payment]}, # 方法2多城市复杂行程 {precond: {trip_type: multi_city}, steps: [plan_multi_city_route, for_each_segment: select_cities_and_date, search_flights_for_all, optimize_itinerary, ...]}, ], use_points: [ {precond: {}, steps: [check_points_balance, select_points_to_use, confirm_deduction]} ] } def plan(self, intent: ExtractedIntent, session_id: str) - nx.DiGraph: 核心规划函数将意图分解为任务DAG task_graph nx.DiGraph() root_task_id f{session_id}_root # 1. 创建根任务节点即用户的主要意图 root_node TaskNode(root_task_id, intent.primary_task) task_graph.add_node(root_task_id, noderoot_node) # 2. 递归分解任务 self._decompose_task(task_graph, root_task_id, intent.parameters) # 3. 处理并行子任务如intent.sub_tasks # 例如用户同时要“订机票”和“用积分”这两个高层任务可以并行 for sub_task_name in intent.sub_tasks: sub_task_id f{session_id}_{sub_task_name} sub_node TaskNode(sub_task_id, sub_task_name) task_graph.add_node(sub_task_id, nodesub_node) # 将此子任务节点与根节点连接通常是并行关系无依赖 # task_graph.add_edge(root_task_id, sub_task_id) # 更常见的做法是作为独立的分解起点 self._decompose_task(task_graph, sub_task_id, intent.parameters) # 4. 为DAG设置初始的“就绪”状态即没有前置任务的任务 self._update_task_readiness(task_graph) return task_graph def _decompose_task(self, graph: nx.DiGraph, task_id: str, params: Dict): 递归分解一个任务 task_node graph.nodes[task_id][node] # 如果是原子任务则停止分解 if task_node.name in self.primitive_tasks: task_node.action task_node.name # 关联执行动作 return # 查找分解方法 method self._find_applicable_method(task_node.name, params) if not method: # 如果没有找到方法将其标记为“需要人工”或降级为原子任务 task_node.action require_human_assist return # 应用方法进行分解 previous_step_id None for step_index, step_name in enumerate(method[steps]): step_id f{task_id}_{step_index}_{step_name} step_node TaskNode(step_id, step_name) graph.add_node(step_id, nodestep_node) # 建立当前步骤与父任务的连接可选用于跟踪 # graph.add_edge(task_id, step_id) # 建立步骤间的顺序依赖串行 if previous_step_id: step_node.preconditions.add(previous_step_id) graph.nodes[previous_step_id][node].children.add(step_id) graph.add_edge(previous_step_id, step_id) previous_step_id step_id # 递归分解子步骤 self._decompose_task(graph, step_id, params) # 将父任务的所有子任务中的“叶子节点”即最终可执行动作与父任务关联 # 这部分逻辑根据实际需求设计可能不需要 def _find_applicable_method(self, task_name: str, params: Dict) - Optional[Dict]: 根据当前参数找到最适合的任务分解方法 if task_name not in self.methods: return None candidate_methods self.methods[task_name] for method in candidate_methods: # 检查该方法的前提条件precond是否被当前参数满足 if self._check_preconditions(method.get(precond, {}), params): return method return None # 没有方法满足条件 def _check_preconditions(self, precond: Dict, params: Dict) - bool: 检查前提条件是否满足简化版 for key, expected_values in precond.items(): if key not in params: return False if isinstance(expected_values, list): if params[key] not in expected_values: return False elif params[key] ! expected_values: return False return True def _update_task_readiness(self, graph: nx.DiGraph): 更新图中所有任务节点的就绪状态 for node_id in graph.nodes(): task_node graph.nodes[node_id][node] # 如果一个任务没有前置条件或者所有前置任务都成功了则标记为READY if not task_node.preconditions: task_node.status TaskStatus.READY # 更复杂的逻辑可以在这里实现比如检查前置任务的状态这个HTNPlanner的核心是plan方法它接收LLM提取的结构化意图然后根据方法库像搭积木一样递归地将其分解成一个由TaskNode组成的有向无环图DAG。DAG清晰地定义了任务之间的依赖关系哪些必须先做哪些可以并行。4. 性能优化实战架构搭好了但要上线承受真实流量性能优化是必须过的一关。4.1 异步任务调度与执行我们使用asyncioRedis实现了一个轻量级的异步任务队列。TaskExecutor不直接执行耗时操作如调用外部航班查询API而是将TaskNode包装成任务消息发送到Redis队列。import asyncio import redis.asyncio as redis from concurrent.futures import ThreadPoolExecutor import json class AsyncTaskExecutor: def __init__(self, redis_url: str): self.redis_client redis.from_url(redis_url) self.task_queue_key ai_customer_service:tasks self.result_channel_prefix task_result: # 用于执行同步的、CPU密集型或阻塞IO操作的线程池 self.io_executor ThreadPoolExecutor(max_workers10) async def execute_dag(self, dag: nx.DiGraph, session_id: str) - Dict: 异步执行一个任务DAG results {} # 1. 找到所有初始就绪的任务 ready_tasks [nid for nid in dag.nodes if dag.nodes[nid][node].status TaskStatus.READY] # 2. 将就绪任务推送到Redis队列 for task_id in ready_tasks: task_node dag.nodes[task_id][node] task_message { task_id: task_id, session_id: session_id, action: task_node.action, params: task_node.params } await self.redis_client.lpush(self.task_queue_key, json.dumps(task_message)) dag.nodes[task_id][node].status TaskStatus.RUNNING # 3. 订阅本session的结果频道等待任务完成 pubsub self.redis_client.pubsub() result_channel f{self.result_channel_prefix}{session_id} await pubsub.subscribe(result_channel) completed_count 0 total_tasks len(list(dag.nodes())) try: while completed_count total_tasks: message await pubsub.get_message(ignore_subscribe_messagesTrue, timeout5.0) if message: data json.loads(message[data]) task_id data[task_id] success data[success] output data.get(output, {}) # 更新任务图状态 dag.nodes[task_id][node].status TaskStatus.SUCCESS if success else TaskStatus.FAILED results[task_id] output completed_count 1 # 根据任务完成情况触发后续就绪任务简化逻辑 # 实际应检查DAG依赖将新就绪的任务入队 self._trigger_next_tasks(dag, task_id) finally: await pubsub.unsubscribe(result_channel) return results def _trigger_next_tasks(self, dag: nx.DiGraph, completed_task_id: str): 一个任务完成后检查并触发其后继任务 completed_node dag.nodes[completed_task_id][node] for child_id in completed_node.children: child_node dag.nodes[child_id][node] # 检查该子任务的所有前置是否都已完成 if all(dag.nodes[pid][node].status TaskStatus.SUCCESS for pid in child_node.preconditions): child_node.status TaskStatus.READY # 在实际系统中这里应该将child_id对应的任务消息放入队列独立的Worker进程从Redis队列中取出任务消息执行具体的动作如查询数据库、调用外部API然后将结果发布到对应的Redis频道。这样Web服务进程不会被阻塞可以快速响应新的用户请求。4.2 对话上下文压缩策略LLM调用是成本大头而成本与输入的token数量直接相关。我们不能无限制地保存和发送整个对话历史。我们的策略是维护两个版本的上下文。完整历史存储在Redis中包含所有原始对话轮次用于内部状态追踪和复盘。压缩摘要在每次调用LLM前动态生成一个精简版上下文。关键信息提取使用一个更小的、便宜的模型或规则从完整历史中提取出已确认的“事实”槽位例如{“departure_city”: “北京”, “date”: “下周五”}。最近对话保留总是保留最近2-3轮原始对话保证LLM能理解最新的语言风格和即时意图。摘要生成对于更早的、已完结的子任务对话比如已经完成的“选择日期”步骤将其压缩成一句话摘要如“用户已确认出行日期为2023-10-27”。这样每次发送给LLM的Prompt就变成了“系统已知信息{压缩后的事实摘要} 最近2轮对话 用户当前问题”。通常能将token消耗降低40%-60%。5. 避坑指南那些我们踩过的“坑”5.1 对话状态管理的幂等性在异步、可能并发的环境下同一个session_id可能会收到连续快速的两个请求比如网络不好用户点了两次。如果两个请求都触发了任务规划可能会导致重复订票等严重问题。我们的解决方案会话锁在处理一个会话的请求前先获取一个基于session_id的Redis分布式锁。确保同一时间只有一个请求能修改该会话的任务图状态。请求去重在短时间内如500ms对完全相同的用户语句进行哈希去重。任务ID唯一性每个TaskNode的ID都包含session_id和唯一序号或时间戳确保全局唯一避免执行冲突。状态机校验在执行原子动作如支付前再次检查数据库中的最新状态确保业务状态的一致性。5.2 冷启动与默认流程配置LLM可能识别不出意图或者HTN方法库没有对应分解规则。系统必须有“托底”方案。LLM识别兜底当LLM的返回置信度低于阈值或JSON解析失败时触发一个基于关键词匹配的规则引擎作为后备至少能识别出“转人工”、“问候”等基础意图。HTN默认流程对于识别出的意图primary_task如果在方法库中找不到完全匹配的则映射到一个通用的generic_inquiry流程。这个流程很简单1) 请求LLM生成一个澄清问题2) 等待用户回复3) 更新参数后再次尝试匹配。这比直接报错体验好得多。人工接管信号在任务执行过程中如果连续多次需要澄清或触发了特定的错误如政策不允许TaskNode的状态会被标记为REQUIRE_HUMAN系统会自动创建工单并通知人工客服介入。6. 效果验证数据说话我们进行了为期两周的A/B测试。A组对照组使用旧的基于规则树的客服机器人。B组实验组使用新的LLMHTN智能任务分解系统。核心指标对比如下指标A组 (旧系统)B组 (新系统)提升复杂问题解决率34%89%161%平均响应时间4.2秒1.8秒降低57%人工转接率45%12%降低73%系统吞吐量 (QPS)1852提升189%用户满意度 (CSAT)3.5/5.04.3/5.0显著提升注复杂问题定义为需要3个以上步骤或涉及多个参数的问题。提升最明显的就是复杂问题的处理。旧系统面对多步问题基本“瘫痪”而新系统能一步步引导用户完成大部分场景无需人工。响应时间的提升主要得益于HTN规划的高效性和异步执行LLM只用在最开始的意图识别阶段。结尾与思考这套LLMHTN的架构确实让我们尝到了甜头它把LLM的“聪明”和HTN的“靠谱”结合得不错。不过在实际运营中我们也在思考一个新的问题如何平衡任务分解的粒度与系统响应延迟如果把任务拆得太细比如“订机票”拆成20个微步骤每个步骤都要和用户交互一次虽然流程严谨但对话轮次会变多整体完成时间拉长用户可能觉得啰嗦。如果拆得太粗比如只拆成3个大步骤每个步骤内部逻辑复杂可能出错率高且一旦出错就要回退更多。我们目前的思路是引入一个“复杂度预测器”。在HTN规划之初根据提取的参数多少和任务本身的复杂度动态选择不同粒度的分解方法。对于熟练用户或简单任务走“快速通道”粗粒度对于新用户或复杂任务走“引导通道”细粒度。同时允许用户在对话中说出“跳过”来合并某些确认步骤。这只是一个初步想法如何准确预测、如何平滑切换都是下一步要探索的。技术的优化永远在路上关键是找到业务体验和技术成本之间的那个最佳平衡点。希望我们这次的实践分享能给大家带来一些启发。