专业电子科技网站建设,我的主页,二手车网站怎么做的,苏州企业网站设计企业Nomic-Embed-Text-V2-MoE数据管道构建#xff1a;Python入门者也能懂的批量文本向量化脚本 你是不是遇到过这样的场景#xff1f;手头有一堆文档、评论或者产品描述#xff0c;想用AI模型把它们变成计算机能理解的“向量”#xff0c;但面对API调用、文件读写和错误处理&a…Nomic-Embed-Text-V2-MoE数据管道构建Python入门者也能懂的批量文本向量化脚本你是不是遇到过这样的场景手头有一堆文档、评论或者产品描述想用AI模型把它们变成计算机能理解的“向量”但面对API调用、文件读写和错误处理感觉有点无从下手。别担心今天我们就来一起写一个Python脚本专门解决这个问题。这个脚本的目标很明确帮你把本地的一堆文本文件安全、高效地转换成向量并且能应对中途可能出现的各种小意外。即使你刚学Python不久跟着步骤走也能轻松搞定。1. 我们要做什么先理清思路在动手写代码之前我们得先想清楚整个流程。你可以把这个脚本想象成一个智能的文本加工流水线。流水线的工作步骤是这样的原料入口脚本需要知道去哪里拿文本文件比如一个装满TXT文件的文件夹或者一个CSV表格。预处理流水线会检查这些“原料”是否合格比如文件能不能打开文本格式对不对把不合格的挑出来。核心加工这是最关键的一步把合格的文本一块一块地送给Nomic-Embed-Text-V2-MoE这个“加工机器”让它吐出对应的向量。应对意外加工过程中网络可能会抖一下或者某块“原料”形状太奇怪机器处理不了。流水线不能因此就全线崩溃它得能记录下问题然后跳过有问题的部分继续工作。成品出口加工好的向量需要整齐地存放起来方便以后使用。我们可以把它存成.npy文件一种专门存数组的格式或者直接写进数据库里。整个脚本的核心就是让上面这个过程自动化、稳定化。下面我们就一步步把它实现出来。2. 搭建你的工作环境工欲善其事必先利其器。我们先来把需要的工具准备好。2.1 安装必要的Python库打开你的终端Windows上是命令提示符或PowerShellMac/Linux上是Terminal输入下面的命令来安装我们需要的“工具包”。pip install requests numpy pandas tqdm我来简单解释一下这几个工具包是干嘛的requests这是Python里最常用的HTTP库简单说就是用来和网络上的API比如我们的向量化模型服务对话的。numpy这是科学计算的基础包我们生成的向量本质上就是一堆数字用numpy来存储和处理这些数字数组非常高效。pandas处理表格数据比如CSV、Excel的利器。如果你的文本数据在CSV文件里用pandas读取会非常方便。tqdm一个能给你的循环加上美观进度条的小工具。当处理成百上千个文本时有个进度条会让你心里有底得多。2.2 准备你的API访问凭证要调用Nomic-Embed-Text-V2-MoE的API你需要一个API Key。这就像一把钥匙证明你有权限使用这个服务。通常你可以在模型的官方平台或你部署的服务后台找到它。拿到之后我们最好别把它硬编码在脚本里。一个更安全、更灵活的做法是把它放在环境变量里。在Linux/Mac的终端里可以这样设置export NOMIC_API_KEY你的实际API密钥在Windows的命令提示符里可以这样设置set NOMIC_API_KEY你的实际API密钥这样我们的脚本运行时就能从系统环境里读取这个密钥既安全又方便在不同机器间迁移。3. 从零开始编写脚本好了工具备齐思路清晰现在打开你喜欢的代码编辑器比如VSCode、PyCharm甚至记事本也行新建一个文件命名为batch_embed.py我们开始写代码。3.1 脚本的“开头”引入工具和设置每个Python脚本的开头都是引入需要的库和定义一些固定的配置。# batch_embed.py import os import json import time from pathlib import Path import requests import numpy as np import pandas as pd from tqdm import tqdm # 配置部分 API_URL https://api.nomic.ai/v1/embed # 假设的API端点请根据实际服务地址修改 API_KEY os.environ.get(NOMIC_API_KEY) # 从环境变量读取API密钥 HEADERS { Authorization: fBearer {API_KEY}, Content-Type: application/json } BATCH_SIZE 32 # 一次发送多少文本给API太小效率低太大可能超限32是个常用值 MAX_RETRIES 3 # 网络请求失败时重试的最大次数 RETRY_DELAY 2 # 重试前等待的秒数 # 检查API密钥是否设置 if not API_KEY: print(错误未找到环境变量 NOMIC_API_KEY。) print(请通过命令设置例如export NOMIC_API_KEYyour_key_here) exit(1)这段代码做了几件事import了所有我们需要的库。定义了API的地址、请求头里面包含了认证信息。设置了一些参数BATCH_SIZE决定了一次处理多少文本MAX_RETRIES和RETRY_DELAY是为了应对网络不稳定。最后检查了一下环境变量里有没有API密钥没有的话就友好地提示用户并退出。3.2 核心功能一读取各种格式的文本我们的文本可能来自不同的地方。我们来写一个函数让它能智能地处理不同格式的输入。def load_texts_from_source(source_path): 从不同来源加载文本数据。 支持单个TXT文件、包含多个TXT的文件夹、CSV文件。 参数: source_path (str): 文件或文件夹路径 返回: list: 文本字符串列表 list: 对应的标识符列表如文件名、行号等 texts [] ids [] path Path(source_path) if path.is_file(): # 处理单个文件 if path.suffix .txt: with open(path, r, encodingutf-8) as f: # 按行读取去掉空行 file_texts [line.strip() for line in f if line.strip()] texts.extend(file_texts) ids.extend([f{path.name}_line{i} for i in range(len(file_texts))]) elif path.suffix .csv: # 假设CSV文件有一列叫text存放文本 try: df pd.read_csv(path) if text in df.columns: texts df[text].astype(str).tolist() # 如果没有id列就用行号 if id in df.columns: ids df[id].astype(str).tolist() else: ids [f{path.name}_row{i} for i in range(len(df))] else: print(f警告CSV文件 {path} 中没有找到 text 列。) return [], [] except Exception as e: print(f读取CSV文件 {path} 时出错{e}) return [], [] elif path.is_dir(): # 处理文件夹下的所有txt文件 txt_files list(path.glob(*.txt)) for txt_file in txt_files: with open(txt_file, r, encodingutf-8) as f: content f.read().strip() if content: # 只添加非空内容 texts.append(content) ids.append(txt_file.name) else: print(f错误路径 {source_path} 不存在。) return [], [] print(f成功从 {source_path} 加载了 {len(texts)} 条文本。) return texts, ids这个函数就像一个智能的文本收集器。你给它一个路径它自己判断这是文件还是文件夹是TXT还是CSV然后按照相应的方式把文本内容读出来并且为每段文本生成一个简单的ID比如文件名加行号方便我们后续追踪。3.3 核心功能二调用API并优雅地处理错误这是整个脚本的“发动机”。它负责把文本发送出去把向量拿回来并且在这个过程中保持稳定。def embed_batch_with_retry(texts_batch): 发送一批文本到嵌入API并带有重试机制。 参数: texts_batch (list): 一批文本字符串 返回: np.ndarray: 嵌入向量数组失败时返回None payload { model: nomic-embed-text-v2-moe, # 指定模型 texts: texts_batch } for attempt in range(MAX_RETRIES): try: response requests.post(API_URL, headersHEADERS, jsonpayload, timeout30) response.raise_for_status() # 如果状态码不是200会抛出异常 result response.json() # 假设API返回的向量在 embeddings 字段里 embeddings np.array(result.get(embeddings, [])) if len(embeddings) ! len(texts_batch): print(f警告API返回的向量数量({len(embeddings)})与请求的文本数量({len(texts_batch)})不符。) return None return embeddings except requests.exceptions.Timeout: print(f请求超时第 {attempt 1} 次重试...) except requests.exceptions.ConnectionError: print(f网络连接错误第 {attempt 1} 次重试...) except requests.exceptions.HTTPError as e: # HTTP错误如401认证失败429请求过多通常重试无用 print(fHTTP错误: {e}) if response.status_code in [401, 403, 429]: print(认证或限流错误请检查API密钥和调用频率。停止重试。) break except Exception as e: print(f未知错误: {e}) # 如果不是最后一次尝试则等待后重试 if attempt MAX_RETRIES - 1: time.sleep(RETRY_DELAY * (attempt 1)) # 等待时间逐渐变长 print(f经过 {MAX_RETRIES} 次尝试批量处理失败。) return None这个函数是稳定性的关键。它用了一个for循环来实现重试。如果一次请求因为网络波动超时、断连失败了它会等一会儿再试而不是直接放弃。同时它也区分了错误类型比如如果是API密钥错了401错误重试再多次也没用它会直接停止并给出明确提示。3.4 核心功能三保存我们的劳动成果向量生成好了得找个地方存起来。这里我们提供两种最常用的方式。def save_embeddings(embeddings, ids, output_formatnpy, output_pathembeddings): 将嵌入向量保存到文件或数据库。 参数: embeddings (np.ndarray): 向量数组 ids (list): 对应的标识符列表 output_format (str): 输出格式npy 或 csv output_path (str): 输出文件的基础路径 if embeddings is None or len(embeddings) 0: print(没有嵌入向量可保存。) return os.makedirs(os.path.dirname(output_path) or ., exist_okTrue) if output_format.lower() npy: # 保存为NumPy的二进制格式体积小加载快 np.save(f{output_path}.npy, embeddings) # 把ID单独存到一个文件方便对应 with open(f{output_path}_ids.txt, w, encodingutf-8) as f: for id_ in ids: f.write(f{id_}\n) print(f向量已保存至 {output_path}.npy, ID列表保存至 {output_path}_ids.txt) elif output_format.lower() csv: # 保存为CSV人类可读但文件可能很大 # 将高维向量展平为一列每个向量变成一行逗号分隔的数字 df pd.DataFrame(embeddings) df.insert(0, id, ids) # 在第一列插入ID df.to_csv(f{output_path}.csv, indexFalse) print(f向量已保存至 {output_path}.csv) else: print(f不支持的输出格式{output_format})存为.npy文件是效率最高的选择尤其是向量维度很高、数据量很大的时候。存为CSV的好处是你可以直接用Excel打开看一眼虽然几百维的数字你也看不过来但文件会变得非常大。你可以根据后续的使用需求来选择。3.5 把一切组装起来主流程函数现在我们把上面所有的零件组装成一条完整的自动化流水线。def main(input_path, output_pathoutput/embeddings, output_formatnpy, batch_sizeBATCH_SIZE): 主函数协调整个文本嵌入流程。 print( 开始批量文本向量化处理 ) # 1. 加载文本 print(f正在从 {input_path} 加载文本...) all_texts, all_ids load_texts_from_source(input_path) if not all_texts: print(未加载到有效文本程序退出。) return # 2. 准备存储结果 all_embeddings [] successful_ids [] failed_batches [] # 3. 分批处理并显示进度条 num_batches (len(all_texts) batch_size - 1) // batch_size print(f共 {len(all_texts)} 条文本将分为 {num_batches} 批进行处理。) for i in tqdm(range(0, len(all_texts), batch_size), desc处理批次): batch_texts all_texts[i:i batch_size] batch_ids all_ids[i:i batch_size] # 4. 调用API获取本批向量 embeddings embed_batch_with_retry(batch_texts) if embeddings is not None: all_embeddings.append(embeddings) successful_ids.extend(batch_ids) else: print(f第 {i//batch_size 1} 批处理失败已跳过。) failed_batches.append(i // batch_size 1) # 5. 合并所有成功的向量 if all_embeddings: final_embeddings np.vstack(all_embeddings) print(f处理完成成功生成 {final_embeddings.shape[0]} 个向量维度为 {final_embeddings.shape[1]}。) if failed_batches: print(f注意第 {failed_batches} 批处理失败相关文本已被跳过。) # 可以选择把失败的ID也保存下来方便后续排查 with open(f{output_path}_failed_batches.txt, w) as f: f.write(f失败的批次号{failed_batches}\n) # 6. 保存结果 save_embeddings(final_embeddings, successful_ids, output_format, output_path) else: print(处理失败未生成任何有效向量。) print( 处理结束 )这个main函数就是我们脚本的“总指挥”。它按顺序执行了加载、分批、处理、保存的全过程并且在整个过程中通过tqdm展示了进度条让你能实时看到处理到哪一步了。它还把处理失败的情况记录下来让你知道哪些数据出了问题而不是稀里糊涂地结束。3.6 让脚本能直接运行最后我们加上这两行代码这样你就可以在终端里直接运行这个Python文件了。if __name__ __main__: # 这里可以设置你的输入文件路径和输出参数 input_file ./data/my_texts.csv # 改成你的文件或文件夹路径 main(input_pathinput_file, output_path./results/my_embeddings, output_formatnpy)4. 试试看跑起来脚本写完了我们来实际运行一下。假设你的文本数据在一个叫documents.csv的文件里你可以这样操作把documents.csv放在和batch_embed.py同一个目录下或者记住它的完整路径。修改脚本最后input_file的路径为你的文件路径比如./documents.csv。打开终端进入到脚本所在的目录。运行命令python batch_embed.py如果一切顺利你会看到终端里出现进度条一行行的处理信息最后告诉你生成了多少个向量保存在了哪里。第一次运行可能会遇到的小问题API地址不对记得把API_URL变量改成你实际使用的Nomic嵌入服务端点。CSV列名不对如果CSV里存放文本的列不叫text记得修改load_texts_from_source函数里对应的列名。内存不足如果你有几十万条文本一次性加载所有向量到内存np.vstack可能会吃光内存。对于超大数据集更稳妥的做法是每处理完一批就立即保存到磁盘比如每批存一个.npy文件最后再考虑合并。5. 回顾与下一步整个脚本写下来你会发现它并不复杂但考虑得比较周全。它解决了从文件读取、批量处理、网络容错到结果保存这一整套流程。对于入门者来说这是一个非常实用的“脚手架”。你可以在这个基础上继续添砖加瓦比如增加日志功能把运行信息记录到文件里而不是只打印在屏幕上。支持从数据库直接读取文本。添加一个配置文件比如config.yaml把API地址、批大小等参数都放在外面不用改代码就能调整。尝试不同的模型参数看看对生成向量的效果有什么影响。最重要的是你亲手搭建了一个能解决实际问题的自动化工具。下次再遇到需要批量处理文本向量的任务直接把这个脚本拿出来改改路径就能用省时省力。编程的乐趣和实用性就在于此。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。