网站建站多钱,江门模板建站定制,本机做网站,wordpress不显示内容你DeerFlow可视化开发#xff1a;使用Streamlit构建管理控制台 如果你正在使用DeerFlow这个多智能体研究框架#xff0c;可能会发现一个问题#xff1a;虽然它的研究能力很强大#xff0c;但日常运维管理却不太方便。每次想看任务状态、查日志、监控性能#xff0c;都得去翻…DeerFlow可视化开发使用Streamlit构建管理控制台如果你正在使用DeerFlow这个多智能体研究框架可能会发现一个问题虽然它的研究能力很强大但日常运维管理却不太方便。每次想看任务状态、查日志、监控性能都得去翻命令行或者看原始数据文件操作起来挺麻烦的。其实很多开发者都有类似的痛点——AI系统功能越强大运维管理就越复杂。今天我就来分享一个实用的解决方案用Streamlit为DeerFlow快速搭建一个可视化运维面板。Streamlit是个特别适合做数据应用的工具它能让开发者用Python脚本快速创建交互式Web应用。我之前用它做过好几个内部工具发现特别适合这种需要实时监控和管理的场景。1. 为什么需要DeerFlow管理控制台先说说为什么我觉得DeerFlow需要一个管理控制台。从实际使用经验来看主要有这么几个痛点任务状态不透明DeerFlow执行一个研究任务时你很难实时知道它进行到哪一步了。是在规划阶段还是在搜索资料或者已经在写报告了这些信息都藏在日志文件里不翻出来看根本不知道。日志查看麻烦DeerFlow的日志分散在不同的地方有系统日志、任务日志、错误日志。想查个问题得在多个文件里跳来跳去效率很低。性能监控缺失每个任务用了多少token花了多长时间哪些步骤比较耗时这些关键指标都没有直观的展示方式。操作不够便捷想重启一个失败的任务或者调整某个任务的参数都得手动去改配置、跑命令不够方便。我之前也试过用命令行工具来管理但总觉得不够直观。后来发现Streamlit正好能解决这些问题——它简单、快速而且做出来的界面还挺好看。2. 环境准备与Streamlit基础2.1 安装Streamlit如果你还没用过Streamlit安装特别简单pip install streamlit验证安装是否成功streamlit hello这个命令会打开一个示例应用里面展示了Streamlit的各种功能组件。你可以先看看感受一下Streamlit能做些什么。2.2 理解Streamlit的核心概念Streamlit的设计理念很特别它把Web应用开发变得像写Python脚本一样简单。几个核心概念你需要了解数据流每次用户交互比如点击按钮、调整滑块都会重新运行整个脚本。听起来效率不高但实际上Streamlit做了很多优化只更新需要更新的部分。组件系统Streamlit提供了丰富的内置组件比如按钮、滑块、表格、图表等。你只需要几行代码就能创建交互界面。会话状态这是Streamlit中比较重要的概念用来在多次脚本运行之间保持数据状态。我们后面会用到它来保存用户的选择和设置。布局控制你可以用st.columns()创建多列布局用st.expander()创建可折叠的区域用st.tabs()创建标签页这些都能让界面更整洁。3. 设计DeerFlow管理控制台3.1 功能规划我设计的这个管理控制台主要包含四个核心功能模块任务监控面板实时显示当前运行的任务状态、进度、执行时间等信息。这是最常用的功能用户一眼就能看到系统在干什么。日志查询系统可以按时间、任务ID、日志级别等条件筛选和搜索日志。支持实时日志流查看就像在命令行里用tail -f一样。性能分析视图展示各个任务的资源使用情况包括token消耗、执行时间、API调用次数等。还能看到历史趋势图。系统配置管理可以在这里修改DeerFlow的配置比如切换搜索引擎、调整模型参数等不用再去改配置文件。3.2 技术架构整个系统的架构其实挺简单的DeerFlow系统 → 日志/状态文件 → Streamlit应用 → Web界面Streamlit应用会定期读取DeerFlow产生的日志文件和状态文件然后把这些数据整理成表格、图表展示出来。用户通过Web界面进行操作Streamlit再把操作转换成对DeerFlow系统的控制命令。4. 核心功能实现4.1 任务监控面板的实现任务监控是管理控制台的核心功能。DeerFlow本身没有提供任务状态API但我们可以通过分析它的日志文件来获取任务信息。首先我们需要一个函数来解析DeerFlow的日志文件import json import os from datetime import datetime from pathlib import Path def parse_deerflow_logs(log_dirlogs): 解析DeerFlow日志文件提取任务信息 tasks [] log_path Path(log_dir) if not log_path.exists(): return tasks # 查找最新的任务日志文件 log_files sorted(log_path.glob(task_*.log), keyos.path.getmtime, reverseTrue) for log_file in log_files[:10]: # 只处理最近10个文件 try: task_id log_file.stem.replace(task_, ) task_info { task_id: task_id, file_path: str(log_file), last_modified: datetime.fromtimestamp(log_file.stat().st_mtime), status: unknown, progress: 0, start_time: None, end_time: None } # 读取日志文件内容 with open(log_file, r, encodingutf-8) as f: lines f.readlines() # 分析日志内容判断任务状态 for line in lines: if 开始执行研究任务 in line or Starting research task in line: task_info[status] running task_info[start_time] extract_time_from_log(line) elif 任务完成 in line or Task completed in line: task_info[status] completed task_info[end_time] extract_time_from_log(line) task_info[progress] 100 elif 任务失败 in line or Task failed in line: task_info[status] failed task_info[end_time] extract_time_from_log(line) elif 进度 in line or Progress in line: # 尝试提取进度百分比 import re match re.search(r(\d)%, line) if match: task_info[progress] int(match.group(1)) tasks.append(task_info) except Exception as e: print(f解析日志文件 {log_file} 时出错: {e}) return tasks def extract_time_from_log(log_line): 从日志行中提取时间信息 try: # 假设日志格式为: [2024-01-15 10:30:00] 日志内容 import re match re.search(r\[(.*?)\], log_line) if match: time_str match.group(1) return datetime.strptime(time_str, %Y-%m-%d %H:%M:%S) except: pass return None有了任务数据我们就可以用Streamlit来展示任务监控面板了import streamlit as st import pandas as pd from datetime import datetime def show_task_monitor(): st.header( 任务监控面板) # 刷新按钮 col1, col2 st.columns([3, 1]) with col1: st.subheader(当前运行任务) with col2: if st.button( 刷新, use_container_widthTrue): st.rerun() # 获取任务数据 tasks parse_deerflow_logs() if not tasks: st.info(暂无运行中的任务) return # 转换为DataFrame便于展示 df_tasks pd.DataFrame(tasks) # 计算运行时长 now datetime.now() df_tasks[duration] df_tasks.apply( lambda row: (row[end_time] or now) - row[start_time] if row[start_time] else None, axis1 ) # 格式化显示 df_display df_tasks[[task_id, status, progress, start_time, duration]].copy() df_display[start_time] df_display[start_time].dt.strftime(%Y-%m-%d %H:%M:%S) df_display[duration] df_display[duration].apply( lambda x: str(x).split(.)[0] if x else N/A ) # 状态颜色映射 status_colors { running: , completed: , failed: , unknown: ⚪ } df_display[status] df_display[status].apply( lambda x: f{status_colors.get(x, ⚪)} {x} ) # 进度条显示 df_display[progress_bar] df_display[progress].apply( lambda x: f{x}% ) # 显示表格 st.dataframe( df_display, column_config{ task_id: 任务ID, status: 状态, progress: st.column_config.ProgressColumn( 进度, help任务执行进度, format%d%%, min_value0, max_value100, ), start_time: 开始时间, duration: 运行时长, }, hide_indexTrue, use_container_widthTrue ) # 任务详情展开 for task in tasks: with st.expander(f任务 {task[task_id]} 详情): col1, col2, col3 st.columns(3) with col1: st.metric(状态, task[status]) with col2: st.metric(进度, f{task[progress]}%) with col3: if task[duration]: st.metric(运行时长, str(task[duration]).split(.)[0]) # 显示日志片段 if st.button(f查看日志, keyfview_log_{task[task_id]}): try: with open(task[file_path], r, encodingutf-8) as f: logs f.read() st.text_area(日志内容, logs, height200) except Exception as e: st.error(f读取日志失败: {e})4.2 实时日志查看器日志查看是运维中最常用的功能之一。我们需要一个能实时显示日志并且支持搜索和过滤的工具import time import threading from queue import Queue class LogMonitor: 日志监控器支持实时日志流 def __init__(self, log_dirlogs): self.log_dir Path(log_dir) self.log_queue Queue() self.monitoring False self.current_file None def start_monitoring(self, log_fileNone): 开始监控日志文件 if log_file is None: # 查找最新的日志文件 log_files sorted(self.log_dir.glob(*.log), keyos.path.getmtime, reverseTrue) if log_files: log_file log_files[0] else: return False self.current_file log_file self.monitoring True # 启动监控线程 thread threading.Thread(targetself._monitor_logs, daemonTrue) thread.start() return True def _monitor_logs(self): 监控日志文件的线程函数 last_size 0 while self.monitoring: try: current_size self.current_file.stat().st_size if current_size last_size: with open(self.current_file, r, encodingutf-8) as f: f.seek(last_size) new_content f.read() if new_content: self.log_queue.put(new_content) last_size current_size time.sleep(1) except Exception as e: print(f日志监控出错: {e}) break def stop_monitoring(self): 停止监控 self.monitoring False def get_new_logs(self): 获取新的日志内容 logs [] while not self.log_queue.empty(): logs.append(self.log_queue.get()) return \n.join(logs) def show_log_viewer(): st.header( 日志查看器) # 初始化日志监控器 if log_monitor not in st.session_state: st.session_state.log_monitor LogMonitor() # 日志文件选择 log_dir Path(logs) if log_dir.exists(): log_files sorted(log_dir.glob(*.log), keyos.path.getmtime, reverseTrue) log_file_names [f.name for f in log_files] selected_file st.selectbox( 选择日志文件, log_file_names, index0 if log_file_names else None ) if selected_file: selected_path log_dir / selected_file # 控制按钮 col1, col2, col3 st.columns(3) with col1: if st.button(▶ 开始监控, use_container_widthTrue): if st.session_state.log_monitor.start_monitoring(selected_path): st.success(开始监控日志) else: st.error(启动监控失败) with col2: if st.button(⏸ 暂停监控, use_container_widthTrue): st.session_state.log_monitor.stop_monitoring() st.info(已暂停监控) with col3: if st.button( 清空日志, use_container_widthTrue): try: with open(selected_path, w) as f: f.write() st.success(日志已清空) except Exception as e: st.error(f清空日志失败: {e}) # 日志级别过滤 log_levels [ALL, INFO, WARNING, ERROR, DEBUG] selected_levels st.multiselect( 过滤日志级别, log_levels, default[INFO, WARNING, ERROR] ) # 关键词搜索 search_keyword st.text_input(搜索关键词, placeholder输入关键词过滤日志...) # 实时日志显示区域 log_display st.empty() # 如果是监控模式定期更新日志显示 if st.session_state.log_monitor.monitoring: if st.session_state.log_monitor.current_file selected_path: # 定期获取新日志 if last_update not in st.session_state: st.session_state.last_update time.time() current_time time.time() if current_time - st.session_state.last_update 2: # 每2秒更新一次 new_logs st.session_state.log_monitor.get_new_logs() if new_logs: if log_content not in st.session_state: st.session_state.log_content st.session_state.log_content new_logs st.session_state.last_update current_time # 显示日志内容 if log_content in st.session_state: display_logs st.session_state.log_content # 应用过滤 if selected_levels and ALL not in selected_levels: lines display_logs.split(\n) filtered_lines [] for line in lines: for level in selected_levels: if f[{level}] in line: filtered_lines.append(line) break display_logs \n.join(filtered_lines) # 应用搜索 if search_keyword: lines display_logs.split(\n) matched_lines [line for line in lines if search_keyword.lower() in line.lower()] display_logs \n.join(matched_lines) log_display.text_area( 实时日志, display_logs, height400, keylog_display_area ) else: # 非监控模式显示完整日志文件 try: with open(selected_path, r, encodingutf-8) as f: all_logs f.read() # 应用过滤和搜索 if selected_levels and ALL not in selected_levels: lines all_logs.split(\n) filtered_lines [] for line in lines: for level in selected_levels: if f[{level}] in line: filtered_lines.append(line) break all_logs \n.join(filtered_lines) if search_keyword: lines all_logs.split(\n) matched_lines [line for line in lines if search_keyword.lower() in line.lower()] all_logs \n.join(matched_lines) log_display.text_area( 日志内容, all_logs, height400, keystatic_log_display ) except Exception as e: st.error(f读取日志文件失败: {e}) else: st.warning(日志目录不存在请检查配置)4.3 性能分析仪表盘性能分析能帮助我们了解DeerFlow的运行状况优化资源配置import plotly.graph_objects as go import plotly.express as px from datetime import datetime, timedelta def collect_performance_data(): 收集性能数据 performance_data [] # 这里可以从DeerFlow的日志或数据库中收集性能数据 # 为了演示我们生成一些模拟数据 base_time datetime.now() - timedelta(days7) for i in range(100): task_time base_time timedelta(hoursi*2) performance_data.append({ task_id: ftask_{i:03d}, timestamp: task_time, duration_seconds: 60 i * 10, # 模拟执行时间 token_usage: 1000 i * 50, # 模拟token使用量 api_calls: 3 (i % 5), # 模拟API调用次数 success: i % 10 ! 0, # 模拟成功率90% model: gpt-4 if i % 3 0 else claude-3 if i % 3 1 else gemini }) return pd.DataFrame(performance_data) def show_performance_dashboard(): st.header( 性能分析仪表盘) # 获取性能数据 df_perf collect_performance_data() if df_perf.empty: st.info(暂无性能数据) return # 时间范围选择 col1, col2 st.columns(2) with col1: date_range st.date_input( 选择时间范围, value( datetime.now() - timedelta(days7), datetime.now() ), max_valuedatetime.now() ) with col2: metric_choice st.selectbox( 选择分析指标, [执行时间, Token使用量, API调用次数, 成功率] ) # 过滤数据 if len(date_range) 2: start_date, end_date date_range df_filtered df_perf[ (df_perf[timestamp] pd.Timestamp(start_date)) (df_perf[timestamp] pd.Timestamp(end_date timedelta(days1))) ] else: df_filtered df_perf # 关键指标卡片 st.subheader(关键指标) col1, col2, col3, col4 st.columns(4) with col1: avg_duration df_filtered[duration_seconds].mean() st.metric(平均执行时间, f{avg_duration:.1f}秒) with col2: total_tokens df_filtered[token_usage].sum() st.metric(总Token使用量, f{total_tokens:,}) with col3: success_rate df_filtered[success].mean() * 100 st.metric(任务成功率, f{success_rate:.1f}%) with col4: avg_api_calls df_filtered[api_calls].mean() st.metric(平均API调用, f{avg_api_calls:.1f}次) # 时间趋势图 st.subheader(时间趋势分析) # 按天聚合数据 df_daily df_filtered.copy() df_daily[date] df_daily[timestamp].dt.date daily_stats df_daily.groupby(date).agg({ duration_seconds: mean, token_usage: sum, api_calls: sum, task_id: count }).reset_index() daily_stats.rename(columns{task_id: task_count}, inplaceTrue) # 根据选择的指标绘制图表 if metric_choice 执行时间: fig px.line( daily_stats, xdate, yduration_seconds, title每日平均执行时间趋势, labels{duration_seconds: 平均执行时间(秒), date: 日期} ) elif metric_choice Token使用量: fig px.bar( daily_stats, xdate, ytoken_usage, title每日Token使用量, labels{token_usage: Token使用量, date: 日期} ) elif metric_choice API调用次数: fig px.line( daily_stats, xdate, yapi_calls, title每日API调用次数, labels{api_calls: API调用次数, date: 日期} ) else: # 成功率 success_by_date df_daily.groupby(date)[success].mean().reset_index() success_by_date[success_rate] success_by_date[success] * 100 fig px.line( success_by_date, xdate, ysuccess_rate, title每日任务成功率, labels{success_rate: 成功率(%), date: 日期} ) st.plotly_chart(fig, use_container_widthTrue) # 模型性能对比 st.subheader(模型性能对比) model_stats df_filtered.groupby(model).agg({ duration_seconds: mean, token_usage: mean, success: mean, task_id: count }).reset_index() model_stats.rename(columns{task_id: task_count}, inplaceTrue) col1, col2 st.columns(2) with col1: # 执行时间对比 fig_time px.bar( model_stats, xmodel, yduration_seconds, title各模型平均执行时间, labels{duration_seconds: 平均执行时间(秒), model: 模型} ) st.plotly_chart(fig_time, use_container_widthTrue) with col2: # 成功率对比 fig_success px.bar( model_stats, xmodel, ysuccess, title各模型任务成功率, labels{success: 成功率, model: 模型} ) st.plotly_chart(fig_success, use_container_widthTrue) # 详细数据表格 st.subheader(详细性能数据) st.dataframe( df_filtered.sort_values(timestamp, ascendingFalse).head(20), column_config{ task_id: 任务ID, timestamp: 执行时间, duration_seconds: st.column_config.NumberColumn( 执行时间(秒), format%.1f秒 ), token_usage: Token使用量, api_calls: API调用次数, success: st.column_config.CheckboxColumn(成功), model: 使用模型 }, hide_indexTrue, use_container_widthTrue )4.4 系统配置管理配置管理功能让用户可以直接在Web界面上调整DeerFlow的设置import yaml from pathlib import Path def load_deerflow_config(): 加载DeerFlow配置文件 config_path Path(conf.yaml) if config_path.exists(): with open(config_path, r, encodingutf-8) as f: return yaml.safe_load(f) return {} def save_deerflow_config(config): 保存DeerFlow配置文件 config_path Path(conf.yaml) with open(config_path, w, encodingutf-8) as f: yaml.dump(config, f, default_flow_styleFalse, allow_unicodeTrue) def show_config_management(): st.header(⚙ 系统配置管理) # 加载当前配置 config load_deerflow_config() if not config: st.warning(未找到配置文件将使用默认配置) config { basic_model: { base_url: https://api.openai.com/v1, model: gpt-4, api_key: }, search_api: tavily, max_plan_iterations: 3, max_step_num: 5 } # 配置编辑表单 with st.form(config_form): st.subheader(基础模型配置) col1, col2 st.columns(2) with col1: base_url st.text_input( API基础地址, valueconfig.get(basic_model, {}).get(base_url, https://api.openai.com/v1), helpLLM API的基础地址 ) with col2: model_name st.text_input( 模型名称, valueconfig.get(basic_model, {}).get(model, gpt-4), help使用的模型名称 ) api_key st.text_input( API密钥, valueconfig.get(basic_model, {}).get(api_key, ), typepassword, helpLLM API的访问密钥 ) st.divider() st.subheader(搜索配置) search_engine st.selectbox( 搜索引擎, [tavily, duckduckgo, brave_search, arxiv, infoquest], index[tavily, duckduckgo, brave_search, arxiv, infoquest].index( config.get(search_api, tavily) ) if config.get(search_api) in [tavily, duckduckgo, brave_search, arxiv, infoquest] else 0 ) st.divider() st.subheader(任务配置) col1, col2 st.columns(2) with col1: max_iterations st.number_input( 最大规划迭代次数, min_value1, max_value10, valueconfig.get(max_plan_iterations, 3), help研究计划的最大迭代次数 ) with col2: max_steps st.number_input( 最大步骤数, min_value1, max_value20, valueconfig.get(max_step_num, 5), help每个研究计划的最大步骤数 ) st.divider() # 表单提交按钮 submitted st.form_submit_button( 保存配置) if submitted: # 更新配置 new_config { basic_model: { base_url: base_url, model: model_name, api_key: api_key }, search_api: search_engine, max_plan_iterations: max_iterations, max_step_num: max_steps } try: save_deerflow_config(new_config) st.success(配置保存成功) # 显示重启提示 st.info( **配置已更新需要重启DeerFlow服务使更改生效** 请执行以下操作 1. 停止当前运行的DeerFlow服务 2. 重新启动服务以应用新配置 ) except Exception as e: st.error(f保存配置失败: {e}) # 配置验证 st.divider() st.subheader(配置验证) if st.button( 验证当前配置): validation_results [] # 检查API配置 if api_key: validation_results.append((API密钥, 已配置, success)) else: validation_results.append((API密钥, 未配置, warning)) # 检查模型配置 if model_name: validation_results.append((模型名称, f {model_name}, success)) else: validation_results.append((模型名称, 未配置, error)) # 检查搜索配置 validation_results.append((搜索引擎, f {search_engine}, success)) # 显示验证结果 for item, status, level in validation_results: if level success: st.success(f{item}: {status}) elif level warning: st.warning(f{item}: {status}) else: st.error(f{item}: {status}) # 快速操作 st.divider() st.subheader(快速操作) col1, col2, col3 st.columns(3) with col1: if st.button( 重启服务, use_container_widthTrue): # 这里可以添加重启DeerFlow服务的逻辑 st.info(重启功能需要根据您的部署方式实现) with col2: if st.button( 导出配置, use_container_widthTrue): config_yaml yaml.dump(config, default_flow_styleFalse, allow_unicodeTrue) st.download_button( label下载配置文件, dataconfig_yaml, file_namedeerflow_config_backup.yaml, mimetext/yaml ) with col3: if st.button( 恢复默认, use_container_widthTrue): if st.checkbox(确认恢复默认配置): default_config { basic_model: { base_url: https://api.openai.com/v1, model: gpt-4, api_key: }, search_api: tavily, max_plan_iterations: 3, max_step_num: 5 } save_deerflow_config(default_config) st.success(已恢复默认配置) st.rerun()5. 整合与部署5.1 主应用整合现在我们把所有功能模块整合到一个Streamlit应用中import streamlit as st def main(): # 页面配置 st.set_page_config( page_titleDeerFlow管理控制台, page_icon, layoutwide, initial_sidebar_stateexpanded ) # 自定义CSS样式 st.markdown( style .main-header { font-size: 2.5rem; color: #1E88E5; margin-bottom: 1rem; } .sub-header { font-size: 1.5rem; color: #424242; margin-top: 1.5rem; margin-bottom: 1rem; } .metric-card { background-color: #f8f9fa; padding: 1rem; border-radius: 0.5rem; border-left: 4px solid #1E88E5; } /style , unsafe_allow_htmlTrue) # 侧边栏 with st.sidebar: st.image(https://via.placeholder.com/150x50/1E88E5/FFFFFF?textDeerFlow, width150) st.title( DeerFlow控制台) st.divider() # 导航菜单 page st.radio( 导航菜单, [ 仪表盘, 任务监控, 日志查看, 性能分析, ⚙ 系统配置], label_visibilitycollapsed ) st.divider() # 系统状态 st.subheader(系统状态) # 模拟系统状态数据 col1, col2 st.columns(2) with col1: st.metric(运行中任务, 3) with col2: st.metric(今日任务, 12) st.divider() # 快速操作 st.subheader(快速操作) if st.button( 新建任务, use_container_widthTrue): st.session_state.show_new_task True if st.button( 刷新所有, use_container_widthTrue): st.rerun() st.divider() # 系统信息 st.caption(f最后更新: {datetime.now().strftime(%Y-%m-%d %H:%M:%S)}) st.caption(DeerFlow v1.0.0) # 主内容区 st.markdown(h1 classmain-headerDeerFlow管理控制台/h1, unsafe_allow_htmlTrue) # 根据选择的页面显示对应内容 if page 仪表盘: show_dashboard() elif page 任务监控: show_task_monitor() elif page 日志查看: show_log_viewer() elif page 性能分析: show_performance_dashboard() elif page ⚙ 系统配置: show_config_management() def show_dashboard(): 显示仪表盘页面 st.markdown(h2 classsub-header系统概览/h2, unsafe_allow_htmlTrue) # 关键指标卡片 col1, col2, col3, col4 st.columns(4) with col1: st.markdown( div classmetric-card h3 运行中/h3 h23/h2 p个任务/p /div , unsafe_allow_htmlTrue) with col2: st.markdown( div classmetric-card h3 已完成/h3 h2127/h2 p个任务/p /div , unsafe_allow_htmlTrue) with col3: st.markdown( div classmetric-card h3⏱ 平均时间/h3 h22.3/h2 p分钟/任务/p /div , unsafe_allow_htmlTrue) with col4: st.markdown( div classmetric-card h3 成功率/h3 h294.5%/h2 p任务成功率/p /div , unsafe_allow_htmlTrue) # 最近任务 st.markdown(h2 classsub-header最近任务/h2, unsafe_allow_htmlTrue) # 模拟最近任务数据 recent_tasks [ {id: TASK-001, name: 市场趋势分析, status: 完成, time: 2分钟前}, {id: TASK-002, name: 技术文档生成, status: 运行中, time: 5分钟前}, {id: TASK-003, name: 竞品分析报告, status: 完成, time: 10分钟前}, {id: TASK-004, name: 代码审查, status: 警告, time: 15分钟前}, {id: TASK-005, name: 数据清洗, status: 完成, time: 30分钟前}, ] for task in recent_tasks: with st.container(): col1, col2, col3, col4 st.columns([2, 3, 2, 2]) with col1: st.text(fID: {task[id]}) with col2: st.text(task[name]) with col3: st.text(task[status]) with col4: st.text(task[time]) st.divider() # 系统状态 st.markdown(h2 classsub-header系统状态/h2, unsafe_allow_htmlTrue) col1, col2 st.columns(2) with col1: # CPU/内存使用情况 st.progress(65, textCPU使用率: 65%) st.progress(42, text内存使用率: 42%) with col2: # 磁盘使用情况 st.progress(78, text磁盘使用率: 78%) st.progress(23, text网络负载: 23%) if __name__ __main__: main()5.2 部署与运行部署Streamlit应用非常简单有几种方式本地运行streamlit run deerflow_dashboard.py这会启动一个本地服务器默认在 http://localhost:8501 可以访问。生产环境部署如果你想让团队其他成员也能访问可以考虑以下几种部署方式Streamlit Cloud最简单将代码推送到GitHub在 streamlit.io 上连接你的仓库选择部署分支和主文件设置环境变量API密钥等Docker部署FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE 8501 CMD [streamlit, run, deerflow_dashboard.py, --server.port8501, --server.address0.0.0.0]然后构建并运行docker build -t deerflow-dashboard . docker run -p 8501:8501 deerflow-dashboard传统服务器部署安装Python和依赖使用systemd或supervisor管理进程配置Nginx反向代理设置SSL证书如果需要HTTPS安全注意事项不要在公网暴露管理界面除非有认证使用强密码或OAuth认证定期更新依赖包监控应用日志和性能6. 实际使用体验与优化建议我实际用这个管理控制台管理DeerFlow系统已经有一段时间了整体体验还不错但也发现了一些可以改进的地方。使用体验实时性很好任务状态和日志都能实时更新不用手动刷新操作直观团队成员即使不懂技术也能快速上手问题定位快通过日志查看器能快速找到错误原因配置管理方便不用再手动编辑YAML文件了遇到的坑和解决方案日志文件太大DeerFlow运行久了日志文件会很大影响读取性能解决方案添加日志轮转和压缩功能实现按日期或大小自动分割日志实时监控占用资源持续监控日志文件会占用一定CPU解决方案优化监控间隔默认5秒检查一次添加开关不需要时可以关闭监控多用户访问冲突多人同时修改配置会有冲突解决方案添加配置锁机制显示当前谁正在编辑配置性能优化建议对于大量任务数据使用分页加载图表数据使用缓存避免重复计算异步加载耗时操作不阻塞界面功能扩展想法添加任务调度功能可以定时执行研究任务集成告警系统任务失败时发送通知添加数据导出功能支持导出为CSV或PDF实现用户权限管理不同角色看到不同功能7. 总结用Streamlit给DeerFlow搭建管理控制台整个过程比想象中要简单。Streamlit的易用性确实让人惊喜基本上写Python脚本的思路就能做出一个功能完整的Web应用。这个控制台解决了DeerFlow运维中的几个实际问题任务状态不透明、日志查看麻烦、性能监控缺失、配置管理不便。虽然功能上还有优化空间但作为第一版已经能大大提升日常运维效率了。如果你也在用DeerFlow或者类似的需要运维管理的AI系统强烈建议试试Streamlit。它不需要前端知识不需要复杂的部署快速原型开发特别合适。当然如果要做成生产级应用可能还需要考虑安全性、性能优化、用户体验等更多方面。最让我满意的是这个方案的可扩展性很好。随着DeerFlow功能的增加管理控制台也能很容易地添加新功能模块。比如未来如果DeerFlow支持更多类型的任务我们只需要在控制台里加对应的监控和管理功能就行了。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。