招标网站的服务费怎么做分录中国芗城区城乡建设局网站
招标网站的服务费怎么做分录,中国芗城区城乡建设局网站,个人是否可以申请持有网站,怎么做卖保险的网站Chandra智能体开发实战#xff1a;基于LangChain的自动化工作流
你是不是也遇到过这样的场景#xff1f;每天上班#xff0c;打开电脑#xff0c;一堆重复性的任务等着你#xff1a;从不同系统里导出数据、手动整理成报告、再发邮件给不同的人。这些活儿说难不难#xf…Chandra智能体开发实战基于LangChain的自动化工作流你是不是也遇到过这样的场景每天上班打开电脑一堆重复性的任务等着你从不同系统里导出数据、手动整理成报告、再发邮件给不同的人。这些活儿说难不难但特别耗时间还容易出错。我之前带的一个项目组每周都要花大半天时间做周报汇总。产品数据从A系统导出运营数据在B系统里技术指标又在C平台上。几个人手动复制粘贴核对半天最后发出去的邮件还经常有格式问题。后来我们尝试用AI来解决这个问题但发现普通的聊天助手只能回答简单问题没法自动执行这一系列操作。直到我们开始用Chandra结合LangChain框架才真正实现了工作流的自动化。现在这个项目组每周的周报汇总工作从原来的半天缩短到了10分钟而且准确率100%。今天我就来分享这个实战方案看看怎么用Chandra和LangChain构建智能自动化工作流。1. 为什么选择Chandra LangChain组合在开始具体实现之前我们先聊聊为什么这个组合特别适合做自动化工作流。Chandra是一个开箱即用的本地AI聊天系统最大的优势是完全私有化部署。你的数据、你的对话、你的工作流程全部都在你自己的服务器上不用担心数据泄露问题。这对于企业应用来说特别重要尤其是处理敏感的业务数据。LangChain则是一个专门为构建大语言模型应用设计的框架。它提供了一套完整的工具链让AI不仅能“聊天”还能“做事”。比如调用外部API、操作数据库、执行代码、管理工具调用等等。把这两个结合起来Chandra提供了强大的对话和理解能力LangChain提供了执行和协调能力正好形成一个完整的“大脑手脚”组合。我对比过几种方案发现这个组合有几个明显优势部署简单Chandra有现成的镜像拉下来就能用不需要从零开始配置CUDA环境扩展性强LangChain有丰富的工具库可以轻松集成各种外部系统成本可控本地部署没有API调用费用适合长期运行学习曲线平缓如果你会用Python基本上一天就能上手2. 环境准备与快速部署2.1 部署ChandraChandra的部署真的很简单基本上就是三步# 拉取镜像 docker pull chandra-ai/chandra:latest # 运行容器 docker run -d \ --name chandra \ -p 7860:7860 \ --gpus all \ chandra-ai/chandra:latest # 访问界面 # 打开浏览器访问 http://localhost:7860如果你没有GPU也可以用CPU版本不过速度会慢一些docker run -d \ --name chandra-cpu \ -p 7860:7860 \ chandra-ai/chandra:cpu-latest部署完成后打开浏览器就能看到一个简洁的聊天界面。你可以先试试基本的对话功能感受一下Chandra的理解能力。2.2 安装LangChain和相关依赖接下来安装Python环境。我建议用虚拟环境避免包冲突# 创建虚拟环境 python -m venv chandra-env # 激活虚拟环境 # Linux/Mac source chandra-env/bin/activate # Windows chandra-env\Scripts\activate # 安装核心包 pip install langchain langchain-community pip install requests python-dotenvLangChain的版本更新比较快我建议安装指定版本避免兼容性问题pip install langchain0.1.0 pip install langchain-community0.0.102.3 配置Chandra的API访问Chandra默认提供了API接口我们需要配置一下访问方式。创建一个.env文件# .env文件 CHANDRA_BASE_URLhttp://localhost:7860 CHANDRA_API_KEYyour_api_key_here # 如果有的话然后在Python代码中配置连接import os from dotenv import load_dotenv from langchain_community.llms import VLLM # 加载环境变量 load_dotenv() # 配置Chandra作为LLM chandra_llm VLLM( modelchandra, vllm_kwargs{ api_url: os.getenv(CHANDRA_BASE_URL) /v1/completions, api_key: os.getenv(CHANDRA_API_KEY, ), }, max_tokens1024, temperature0.1, # 低温度让输出更稳定 )这样就完成了基础环境的搭建。整个过程大概15-20分钟比很多复杂的AI框架要简单得多。3. 构建第一个自动化工作流周报汇总我们从一个实际场景开始自动生成项目周报。这个工作流需要完成以下任务从不同系统获取数据分析数据并生成洞察整理成标准格式的报告发送给相关人员3.1 定义工作流步骤首先我们明确一下工作流的具体步骤from langchain.agents import Tool, AgentExecutor, create_react_agent from langchain.prompts import PromptTemplate from langchain.memory import ConversationBufferMemory # 定义工作流步骤 workflow_steps [ { name: 获取产品数据, description: 从产品管理系统获取本周的产品数据, tool: fetch_product_data }, { name: 获取运营数据, description: 从运营平台获取用户活跃度和转化数据, tool: fetch_operation_data }, { name: 获取技术指标, description: 从监控系统获取系统性能和错误率数据, tool: fetch_tech_metrics }, { name: 数据分析, description: 分析各项数据找出关键变化和问题, tool: analyze_data }, { name: 生成报告, description: 将分析结果整理成标准格式的周报, tool: generate_report }, { name: 发送通知, description: 将报告发送给项目组成员, tool: send_notification } ]3.2 实现工具函数每个步骤都需要对应的工具函数。我们先实现几个示例import json from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart class WorkflowTools: 工作流工具集合 staticmethod def fetch_product_data(date_rangeNone): 模拟获取产品数据 if date_range is None: end_date datetime.now() start_date end_date - timedelta(days7) else: start_date, end_date date_range # 这里应该是实际的API调用 # 为了示例我们返回模拟数据 return { period: f{start_date.strftime(%Y-%m-%d)} 到 {end_date.strftime(%Y-%m-%d)}, new_users: 1250, active_users: 8450, retention_rate: 0.78, feature_usage: { feature_a: 3200, feature_b: 2100, feature_c: 980 } } staticmethod def fetch_operation_data(date_rangeNone): 模拟获取运营数据 return { marketing_campaigns: 3, conversion_rate: 0.15, customer_feedback: { positive: 42, neutral: 18, negative: 5 }, revenue: 125000 } staticmethod def fetch_tech_metrics(date_rangeNone): 模拟获取技术指标 return { system_availability: 0.9995, avg_response_time: 245, # 毫秒 error_rate: 0.002, active_servers: 8, data_processed: 2.5TB } staticmethod def analyze_data(product_data, operation_data, tech_data): 分析数据并生成洞察 insights [] # 分析用户增长 if product_data[new_users] 1000: insights.append(本周新增用户超过1000增长势头良好) # 分析留存率 if product_data[retention_rate] 0.8: insights.append(f用户留存率({product_data[retention_rate]*100}%)有提升空间) # 分析转化率 if operation_data[conversion_rate] 0.2: insights.append(f转化率({operation_data[conversion_rate]*100}%)需要优化) # 分析系统性能 if tech_data[avg_response_time] 300: insights.append(f平均响应时间({tech_data[avg_response_time]}ms)偏高) return { insights: insights, key_metrics: { user_growth: product_data[new_users], revenue: operation_data[revenue], system_stability: tech_data[system_availability] } } staticmethod def generate_report(insights_data, templatestandard): 生成周报 report_date datetime.now().strftime(%Y年%m月%d日) report f# 项目周报 ({report_date}) ## 一、核心指标概览 - 新增用户{insights_data[key_metrics][user_growth]}人 - 本周营收¥{insights_data[key_metrics][revenue]:,.0f} - 系统可用性{insights_data[key_metrics][system_stability]*100:.2f}% ## 二、关键洞察 for i, insight in enumerate(insights_data[insights], 1): report f{i}. {insight}\n report ## 三、下周重点 1. 持续监控用户增长趋势 2. 优化转化漏斗 3. 提升系统响应速度 ## 四、风险与建议 - 风险用户留存率有待提升 - 建议开展用户调研了解流失原因 return report staticmethod def send_notification(report, recipients, subject项目周报): 发送通知模拟 print(f发送周报到: {, .join(recipients)}) print(f邮件主题: {subject}) print(f报告内容:\n{report}) # 实际应用中这里应该是真实的邮件发送逻辑 # 为了示例我们只打印日志 return { status: success, recipients: recipients, timestamp: datetime.now().isoformat() }3.3 创建智能体工作流现在我们把所有组件组合起来创建一个完整的智能体from langchain.agents import AgentExecutor, create_react_agent from langchain.tools import Tool from langchain.memory import ConversationBufferMemory def create_weekly_report_agent(llm): 创建周报生成智能体 tools WorkflowTools() # 定义工具列表 tool_list [ Tool( name获取产品数据, functools.fetch_product_data, description获取产品相关的用户数据和功能使用数据 ), Tool( name获取运营数据, functools.fetch_operation_data, description获取运营相关的市场活动和收入数据 ), Tool( name获取技术指标, functools.fetch_tech_metrics, description获取系统性能和技术指标数据 ), Tool( name数据分析, funclambda **kwargs: tools.analyze_data( kwargs.get(product_data, {}), kwargs.get(operation_data, {}), kwargs.get(tech_data, {}) ), description分析各项数据并生成业务洞察 ), Tool( name生成报告, functools.generate_report, description根据分析结果生成格式化的周报 ), Tool( name发送通知, functools.send_notification, description将报告发送给相关人员 ) ] # 创建提示模板 prompt_template 你是一个项目周报自动化助手。请按照以下步骤执行周报生成工作流 步骤 1. 获取产品数据 2. 获取运营数据 3. 获取技术指标 4. 分析所有数据找出关键洞察 5. 生成格式化的周报 6. 发送给项目组成员 请一步一步执行确保每个步骤都完成后再进行下一步。 当前对话 {history} 人类{input} 助手 prompt PromptTemplate( input_variables[history, input], templateprompt_template ) # 创建记忆 memory ConversationBufferMemory(memory_keyhistory) # 创建智能体 agent create_react_agent( llmllm, toolstool_list, promptprompt ) # 创建执行器 agent_executor AgentExecutor( agentagent, toolstool_list, memorymemory, verboseTrue, # 显示详细执行过程 handle_parsing_errorsTrue ) return agent_executor # 使用智能体 def generate_weekly_report(): 执行周报生成工作流 agent create_weekly_report_agent(chandra_llm) # 执行工作流 result agent.invoke({ input: 请生成本周的项目周报并发送给项目组全体成员 }) return result # 运行工作流 if __name__ __main__: print(开始生成周报...) result generate_weekly_report() print(\n工作流执行完成) print(f输出结果: {result[output]})4. 高级功能动态任务分解与执行上面的例子是固定步骤的工作流但实际工作中很多任务需要动态分解。比如“帮我分析一下上个月的业务数据”这个任务需要智能体自己决定要获取哪些数据、怎么分析、输出什么格式。4.1 实现动态任务分解from langchain.chains import LLMChain from langchain.prompts import ChatPromptTemplate class DynamicWorkflowAgent: 动态工作流智能体 def __init__(self, llm): self.llm llm self.tools WorkflowTools() def plan_tasks(self, user_request): 根据用户请求规划任务步骤 planning_prompt ChatPromptTemplate.from_messages([ (system, 你是一个任务规划专家。请根据用户的需求分解出具体的执行步骤。 每个步骤应该清晰、可执行并且有明确的输入输出。 请按照以下格式输出 步骤1: [步骤描述] | 工具: [工具名称] | 输入: [输入参数] 步骤2: [步骤描述] | 工具: [工具名称] | 输入: [输入参数] ...), (human, {request}) ]) chain planning_prompt | self.llm plan chain.invoke({request: user_request}) # 解析计划 tasks self._parse_plan(plan.content) return tasks def _parse_plan(self, plan_text): 解析计划文本 tasks [] lines plan_text.strip().split(\n) for line in lines: if line.startswith(步骤): parts line.split(|) if len(parts) 3: task { description: parts[0].split(:)[1].strip(), tool: parts[1].split(:)[1].strip() if len(parts) 1 else , input: parts[2].split(:)[1].strip() if len(parts) 2 else } tasks.append(task) return tasks def execute_tasks(self, tasks): 执行任务列表 results [] context {} # 存储中间结果 for i, task in enumerate(tasks, 1): print(f执行步骤{i}: {task[description]}) try: # 根据工具名调用对应的函数 tool_func getattr(self.tools, task[tool], None) if tool_func: # 解析输入参数 if task[input]: # 简单的参数解析实际应用可能需要更复杂的逻辑 kwargs eval(fdict({task[input]})) result tool_func(**kwargs) else: result tool_func() # 保存结果到上下文 context[fstep_{i}_result] result results.append({ step: i, status: success, result: result }) print(f 结果: {str(result)[:100]}...) else: results.append({ step: i, status: error, error: f工具 {task[tool]} 不存在 }) except Exception as e: results.append({ step: i, status: error, error: str(e) }) return results, context # 使用动态工作流 def run_dynamic_workflow(user_request): 运行动态工作流 agent DynamicWorkflowAgent(chandra_llm) print(f用户请求: {user_request}) print(正在规划任务...) # 规划任务 tasks agent.plan_tasks(user_request) print(f规划出 {len(tasks)} 个任务:) for i, task in enumerate(tasks, 1): print(f {i}. {task[description]} (使用工具: {task[tool]})) print(\n开始执行任务...) # 执行任务 results, context agent.execute_tasks(tasks) print(\n任务执行完成) # 汇总结果 successful sum(1 for r in results if r[status] success) failed sum(1 for r in results if r[status] error) return { total_tasks: len(tasks), successful: successful, failed: failed, results: results, context: context } # 示例运行动态分析 if __name__ __main__: request 分析上个月的用户增长情况和系统性能找出需要改进的地方 result run_dynamic_workflow(request) print(f\n汇总: 共{result[total_tasks]}个任务成功{result[successful]}个失败{result[failed]}个)4.2 处理复杂依赖关系实际工作中任务之间往往有依赖关系。比如必须先获取数据才能进行分析。我们需要更智能的依赖管理from typing import List, Dict, Any import networkx as nx class DependencyAwareWorkflow: 支持依赖关系的工作流 def __init__(self): self.graph nx.DiGraph() self.tasks {} def add_task(self, task_id: str, task_func, dependencies: List[str] None): 添加任务 self.tasks[task_id] task_func self.graph.add_node(task_id) if dependencies: for dep in dependencies: self.graph.add_edge(dep, task_id) def get_execution_order(self): 获取执行顺序拓扑排序 try: order list(nx.topological_sort(self.graph)) return order except nx.NetworkXUnfeasible: raise ValueError(工作流中存在循环依赖) def execute(self, initial_context: Dict[str, Any] None): 执行工作流 if initial_context is None: context {} else: context initial_context.copy() execution_order self.get_execution_order() results {} print(工作流执行顺序:, - .join(execution_order)) for task_id in execution_order: print(f\n执行任务: {task_id}) # 准备参数 task_func self.tasks[task_id] # 这里可以根据任务函数的签名动态传递参数 # 简化版传递整个上下文 try: result task_func(context) context[task_id] result results[task_id] { status: success, result: result } print(f 成功: {str(result)[:80]}...) except Exception as e: results[task_id] { status: error, error: str(e) } print(f 失败: {str(e)}) return results, context # 使用依赖感知工作流 def create_data_analysis_workflow(): 创建数据分析工作流 workflow DependencyAwareWorkflow() tools WorkflowTools() # 定义任务 workflow.add_task( fetch_data, lambda ctx: { product: tools.fetch_product_data(), operation: tools.fetch_operation_data(), tech: tools.fetch_tech_metrics() } ) workflow.add_task( analyze_growth, lambda ctx: { user_growth: ctx[fetch_data][product][new_users], growth_rate: ctx[fetch_data][product][new_users] / 1000 # 简化计算 }, dependencies[fetch_data] ) workflow.add_task( analyze_performance, lambda ctx: { response_time: ctx[fetch_data][tech][avg_response_time], needs_optimization: ctx[fetch_data][tech][avg_response_time] 300 }, dependencies[fetch_data] ) workflow.add_task( generate_insights, lambda ctx: { insights: [ f用户增长: {ctx[analyze_growth][user_growth]}人, f增长率: {ctx[analyze_growth][growth_rate]:.1%}, f系统响应时间: {ctx[analyze_performance][response_time]}ms, 需要优化 if ctx[analyze_performance][needs_optimization] else 性能良好 ] }, dependencies[analyze_growth, analyze_performance] ) return workflow # 执行工作流 workflow create_data_analysis_workflow() results, context workflow.execute() print(\n最终结果:) for task_id, result in results.items(): if result[status] success: print(f{task_id}: 成功) else: print(f{task_id}: 失败 - {result[error]})5. 实际应用场景扩展基于Chandra和LangChain的自动化工作流可以应用到很多实际场景中。我分享几个我们团队实际用过的例子5.1 客户支持自动化class CustomerSupportWorkflow: 客户支持工作流 def __init__(self, llm): self.llm llm def handle_ticket(self, ticket_content): 处理客户工单 # 1. 分析工单内容 analysis self._analyze_ticket(ticket_content) # 2. 根据问题类型路由 if analysis[urgency] high: return self._handle_urgent_ticket(ticket_content, analysis) elif analysis[category] technical: return self._handle_technical_ticket(ticket_content, analysis) elif analysis[category] billing: return self._handle_billing_ticket(ticket_content, analysis) else: return self._handle_general_ticket(ticket_content, analysis) def _analyze_ticket(self, content): 分析工单 prompt f分析以下客户工单内容 {content} 请判断 1. 问题类别technical/billing/general 2. 紧急程度high/medium/low 3. 建议的处理方式 以JSON格式回复。 response self.llm.invoke(prompt) # 解析响应返回结构化数据 return self._parse_analysis(response)5.2 数据监控与告警class MonitoringWorkflow: 监控告警工作流 def check_system_health(self): 检查系统健康状态 metrics self._collect_metrics() alerts [] # 检查各项指标 if metrics[cpu_usage] 80: alerts.append({ level: warning, message: fCPU使用率过高: {metrics[cpu_usage]}%, suggestion: 检查是否有异常进程 }) if metrics[memory_usage] 90: alerts.append({ level: critical, message: f内存使用率过高: {metrics[memory_usage]}%, suggestion: 考虑扩容或优化内存使用 }) if metrics[error_rate] 0.01: alerts.append({ level: warning, message: f错误率偏高: {metrics[error_rate]*100:.2f}%, suggestion: 检查最近部署的代码 }) # 如果有告警发送通知 if alerts: self._send_alerts(alerts) return { status: healthy if not alerts else unhealthy, alerts: alerts, timestamp: datetime.now().isoformat() }5.3 内容审核自动化class ContentModerationWorkflow: 内容审核工作流 def moderate_content(self, content, content_typetext): 审核内容 # 1. 安全检查 safety_check self._check_safety(content) if not safety_check[passed]: return { action: reject, reason: safety_check[reasons], confidence: safety_check[confidence] } # 2. 质量检查 quality_check self._check_quality(content) if quality_check[score] 0.6: return { action: review, reason: 内容质量较低需要人工审核, score: quality_check[score] } # 3. 分类打标 categories self._classify_content(content) return { action: approve, categories: categories, confidence: 0.85 # 示例置信度 }6. 部署与运维建议6.1 生产环境部署对于生产环境我建议采用以下架构# docker-compose.yml 示例 version: 3.8 services: chandra: image: chandra-ai/chandra:latest container_name: chandra ports: - 7860:7860 volumes: - ./data:/app/data - ./models:/app/models environment: - MODEL_CACHE_DIR/app/models - DATA_DIR/app/data deploy: resources: reservations: devices: - driver: nvidia count: 1 capabilities: [gpu] restart: unless-stopped workflow-api: build: ./workflow-api container_name: workflow-api ports: - 8000:8000 environment: - CHANDRA_URLhttp://chandra:7860 - DATABASE_URLpostgresql://user:passdb:5432/workflows depends_on: - chandra - db restart: unless-stopped db: image: postgres:15 container_name: postgres environment: - POSTGRES_DBworkflows - POSTGRES_USERuser - POSTGRES_PASSWORDpass volumes: - postgres_data:/var/lib/postgresql/data restart: unless-stopped volumes: postgres_data:6.2 监控与日志import logging from logging.handlers import RotatingFileHandler import json def setup_logging(): 配置日志 logger logging.getLogger(workflow) logger.setLevel(logging.INFO) # 文件日志 file_handler RotatingFileHandler( workflow.log, maxBytes10*1024*1024, # 10MB backupCount5 ) file_handler.setFormatter(logging.Formatter( %(asctime)s - %(name)s - %(levelname)s - %(message)s )) # 控制台日志 console_handler logging.StreamHandler() console_handler.setFormatter(logging.Formatter( %(levelname)s: %(message)s )) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger class WorkflowMonitor: 工作流监控 def __init__(self, logger): self.logger logger self.metrics { total_executions: 0, successful: 0, failed: 0, avg_duration: 0 } def record_execution(self, workflow_name, duration, success, errorNone): 记录执行情况 self.metrics[total_executions] 1 if success: self.metrics[successful] 1 self.logger.info(f工作流 {workflow_name} 执行成功耗时 {duration:.2f}秒) else: self.metrics[failed] 1 self.logger.error(f工作流 {workflow_name} 执行失败: {error}) # 更新平均耗时 total_time self.metrics[avg_duration] * (self.metrics[total_executions] - 1) self.metrics[avg_duration] (total_time duration) / self.metrics[total_executions] def get_report(self): 获取监控报告 success_rate (self.metrics[successful] / self.metrics[total_executions] * 100 if self.metrics[total_executions] 0 else 0) return { success_rate: f{success_rate:.1f}%, avg_duration: f{self.metrics[avg_duration]:.2f}秒, total_executions: self.metrics[total_executions] }6.3 错误处理与重试import time from functools import wraps from typing import Callable, Any def retry(max_attempts: int 3, delay: float 1.0): 重试装饰器 def decorator(func: Callable) - Callable: wraps(func) def wrapper(*args, **kwargs) - Any: last_exception None for attempt in range(max_attempts): try: return func(*args, **kwargs) except Exception as e: last_exception e if attempt max_attempts - 1: wait_time delay * (2 ** attempt) # 指数退避 print(f尝试 {func.__name__} 失败 (第{attempt 1}次){wait_time}秒后重试...) time.sleep(wait_time) else: print(f尝试 {func.__name__} 失败已达最大重试次数) raise last_exception return wrapper return decorator class ResilientWorkflow: 具有容错能力的工作流 retry(max_attempts3, delay2.0) def fetch_data_with_retry(self, data_source): 带重试的数据获取 # 这里实现具体的数据获取逻辑 pass def execute_with_fallback(self, main_func, fallback_func): 执行主函数失败时执行备用函数 try: return main_func() except Exception as e: print(f主函数执行失败尝试备用方案: {e}) return fallback_func()7. 总结用Chandra和LangChain构建自动化工作流最大的感受就是“实用”。不像有些AI项目看着很炫酷但实际用起来各种问题。这个组合从部署到开发整个流程都比较顺畅。我们团队用这套方案快半年了处理了上万次自动化任务。最明显的变化是大家从重复性的手工劳动中解放出来了有更多时间做更有价值的事情。比如原来花半天时间做周报的同事现在用这个系统10分钟搞定剩下的时间可以做深度数据分析。如果你也想尝试自动化工作流我的建议是从小处着手先找一个最痛点的场景比如周报生成、数据汇总这种每周都要做的重复工作逐步完善不要想着一口气做出完美的系统先实现核心功能再慢慢添加高级特性重视监控自动化系统最怕的就是“黑盒”一定要做好日志和监控知道系统在干什么保持简单能用简单方法解决的问题不要用复杂方案。代码越简单维护成本越低实际用下来Chandra的稳定性不错LangChain的生态也很丰富。遇到问题的时候社区里基本上都能找到解决方案。当然任何技术方案都不是银弹这个组合最适合的是那些有明确流程、重复性高的任务。如果你正在被重复性工作困扰或者想提升团队的工作效率真的可以试试这个方案。从部署到跑通第一个工作流快的话半天时间就够了。投入不大但回报可能很显著。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。