西安网络公司网站建设,克隆的网站怎么做数据库,大型地方门户网站源码,东莞网站制作很好 乐云践新Whisper-large-v3实操手册#xff1a;批量转录脚本编写进度监控错误日志追踪 作者#xff1a;by113小贝 | 基于OpenAI Whisper Large v3语音识别模型二次开发构建 1. 项目概述与环境准备 Whisper-large-v3作为OpenAI推出的多语言语音识别模型#xff0c;支持99种语言的自动…Whisper-large-v3实操手册批量转录脚本编写进度监控错误日志追踪作者by113小贝 | 基于OpenAI Whisper Large v3语音识别模型二次开发构建1. 项目概述与环境准备Whisper-large-v3作为OpenAI推出的多语言语音识别模型支持99种语言的自动检测与转录在语音转文字领域表现出色。但在实际业务场景中我们往往需要处理大量音频文件手动一个个上传转录显然不现实。本文将手把手教你如何基于Whisper-large-v3构建批量转录解决方案包含完整的脚本编写、进度监控和错误日志追踪功能让你能够高效处理成百上千个音频文件。环境要求确认GPUNVIDIA RTX 4090 D23GB显存或同等性能显卡内存16GB以上系统Ubuntu 24.04 LTS推荐或其他Linux发行版存储至少10GB可用空间模型文件约3GB2. 批量转录核心脚本编写2.1 基础批量处理脚本我们先从最简单的批量转录脚本开始这个脚本能够遍历指定文件夹中的所有音频文件并进行转录import os import whisper import json from datetime import datetime class BatchWhisperTranscriber: def __init__(self, model_sizelarge-v3, devicecuda): print(正在加载Whisper模型...) self.model whisper.load_model(model_size, devicedevice) print(模型加载完成) def transcribe_audio(self, audio_path, output_dirtranscripts): 转录单个音频文件 try: # 确保输出目录存在 os.makedirs(output_dir, exist_okTrue) # 执行转录 result self.model.transcribe(audio_path) # 生成输出文件名 base_name os.path.splitext(os.path.basename(audio_path))[0] output_file os.path.join(output_dir, f{base_name}_transcript.txt) # 保存转录结果 with open(output_file, w, encodingutf-8) as f: f.write(result[text]) # 保存完整结果包含时间戳等信息 json_file os.path.join(output_dir, f{base_name}_full.json) with open(json_file, w, encodingutf-8) as f: json.dump(result, f, ensure_asciiFalse, indent2) return True, result[text] except Exception as e: return False, str(e) def batch_transcribe(self, input_dir, extensions[.wav, .mp3, .m4a]): 批量转录目录中的所有音频文件 success_count 0 fail_count 0 results [] # 收集所有音频文件 audio_files [] for ext in extensions: audio_files.extend([f for f in os.listdir(input_dir) if f.endswith(ext)]) print(f找到 {len(audio_files)} 个音频文件) for i, audio_file in enumerate(audio_files, 1): audio_path os.path.join(input_dir, audio_file) print(f正在处理 ({i}/{len(audio_files)}): {audio_file}) success, result self.transcribe_audio(audio_path) if success: success_count 1 results.append({ file: audio_file, status: success, text: result[:100] ... if len(result) 100 else result }) else: fail_count 1 results.append({ file: audio_file, status: failed, error: result }) return { total: len(audio_files), success: success_count, failed: fail_count, results: results } # 使用示例 if __name__ __main__: transcriber BatchWhisperTranscriber() result transcriber.batch_transcribe(/path/to/your/audio/files) print(f处理完成成功: {result[success]}, 失败: {result[failed]})2.2 支持多种音频格式Whisper-large-v3支持多种音频格式但为了确保兼容性我们可以在脚本中加入格式检查和转换功能import subprocess def convert_to_wav_if_needed(audio_path, target_dirconverted): 将音频文件转换为WAV格式以确保兼容性 os.makedirs(target_dir, exist_okTrue) base_name os.path.splitext(os.path.basename(audio_path))[0] output_path os.path.join(target_dir, f{base_name}.wav) # 如果已经是WAV格式直接返回原路径 if audio_path.lower().endswith(.wav): return audio_path # 使用FFmpeg进行格式转换 try: command [ ffmpeg, -i, audio_path, -acodec, pcm_s16le, -ar, 16000, -ac, 1, output_path, -y # 覆盖已存在文件 ] subprocess.run(command, checkTrue, capture_outputTrue) return output_path except subprocess.CalledProcessError as e: raise Exception(f音频转换失败: {e.stderr.decode()}) # 在transcribe_audio方法中添加格式转换 def transcribe_audio(self, audio_path, output_dirtranscripts): try: # 确保输出目录存在 os.makedirs(output_dir, exist_okTrue) # 转换音频格式如果需要 converted_path convert_to_wav_if_needed(audio_path) # 执行转录使用转换后的文件 result self.model.transcribe(converted_path) # ...其余代码保持不变3. 进度监控系统实现3.1 实时进度显示对于大量文件的处理实时进度监控非常重要。我们可以使用tqdm库来创建美观的进度条from tqdm import tqdm import time def batch_transcribe_with_progress(self, input_dir, extensions[.wav, .mp3, .m4a]): 带进度条的批量转录 # 收集音频文件 audio_files [] for ext in extensions: audio_files.extend([f for f in os.listdir(input_dir) if f.endswith(ext)]) print(f找到 {len(audio_files)} 个音频文件) success_count 0 fail_count 0 results [] # 创建进度条 with tqdm(totallen(audio_files), desc转录进度) as pbar: for audio_file in audio_files: audio_path os.path.join(input_dir, audio_file) # 更新进度条描述 pbar.set_description(f处理: {audio_file[:20]}...) start_time time.time() success, result self.transcribe_audio(audio_path) processing_time time.time() - start_time if success: success_count 1 results.append({ file: audio_file, status: success, processing_time: round(processing_time, 2) }) else: fail_count 1 results.append({ file: audio_file, status: failed, error: result, processing_time: round(processing_time, 2) }) # 更新进度条 pbar.update(1) pbar.set_postfix({ 成功: success_count, 失败: fail_count, 当前耗时: f{processing_time:.2f}s }) return { total: len(audio_files), success: success_count, failed: fail_count, results: results }3.2 详细统计信息添加更详细的统计信息帮助分析处理效率def generate_statistics(self, results): 生成处理统计信息 total_time sum([r.get(processing_time, 0) for r in results]) success_files [r for r in results if r[status] success] failed_files [r for r in results if r[status] failed] avg_time total_time / len(results) if results else 0 success_avg sum(r[processing_time] for r in success_files) / len(success_files) if success_files else 0 stats { 处理日期: datetime.now().strftime(%Y-%m-%d %H:%M:%S), 总文件数: len(results), 成功数: len(success_files), 失败数: len(failed_files), 成功率: f{(len(success_files)/len(results)*100):.2f}% if results else 0%, 总处理时间: f{total_time:.2f}秒, 平均处理时间: f{avg_time:.2f}秒/文件, 成功文件平均时间: f{success_avg:.2f}秒/文件, } return stats # 在批量处理完成后调用 result_data transcriber.batch_transcribe_with_progress(/path/to/audio) stats transcriber.generate_statistics(result_data[results]) print(\n 处理统计 ) for key, value in stats.items(): print(f{key}: {value})4. 错误日志追踪系统4.1 完整的日志记录建立完善的错误日志系统确保所有问题都能被追踪和排查import logging from logging.handlers import RotatingFileHandler def setup_logging(log_dirlogs): 设置日志系统 os.makedirs(log_dir, exist_okTrue) # 创建logger logger logging.getLogger(WhisperBatchProcessor) logger.setLevel(logging.DEBUG) # 防止重复添加handler if logger.handlers: return logger # 文件handler按日期滚动 log_file os.path.join(log_dir, fwhisper_batch_{datetime.now().strftime(%Y%m%d)}.log) file_handler RotatingFileHandler( log_file, maxBytes10*1024*1024, backupCount5, encodingutf-8 ) file_handler.setLevel(logging.DEBUG) # 控制台handler console_handler logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式设置 formatter logging.Formatter( %(asctime)s - %(name)s - %(levelname)s - %(message)s ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # 添加handler logger.addHandler(file_handler) logger.addHandler(console_handler) return logger # 在类中集成日志系统 class BatchWhisperTranscriber: def __init__(self, model_sizelarge-v3, devicecuda): self.logger setup_logging() self.logger.info(初始化Whisper批量转录器) try: self.logger.info(正在加载Whisper模型...) self.model whisper.load_model(model_size, devicedevice) self.logger.info(模型加载完成) except Exception as e: self.logger.error(f模型加载失败: {str(e)}) raise4.2 错误分类与处理针对不同类型的错误进行分类处理def transcribe_audio(self, audio_path, output_dirtranscripts): try: # 检查文件是否存在 if not os.path.exists(audio_path): error_msg f文件不存在: {audio_path} self.logger.error(error_msg) return False, error_msg # 检查文件大小 file_size os.path.getsize(audio_path) / (1024 * 1024) # MB if file_size 100: # 100MB限制 error_msg f文件过大: {file_size:.2f}MB 100MB限制 self.logger.warning(error_msg) return False, error_msg # 执行转录 self.logger.info(f开始转录: {os.path.basename(audio_path)}) start_time time.time() result self.model.transcribe(audio_path) processing_time time.time() - start_time self.logger.info(f转录完成: {os.path.basename(audio_path)}, 耗时: {processing_time:.2f}秒) # 保存结果 base_name os.path.splitext(os.path.basename(audio_path))[0] output_file os.path.join(output_dir, f{base_name}_transcript.txt) with open(output_file, w, encodingutf-8) as f: f.write(result[text]) # 记录成功信息 self.logger.debug(f结果已保存: {output_file}) return True, result[text] except Exception as e: error_msg f转录失败 {audio_path}: {str(e)} self.logger.error(error_msg, exc_infoTrue) # 记录完整堆栈信息 return False, error_msg4.3 错误报告生成生成详细的错误报告方便后续排查def generate_error_report(self, results, report_fileerror_report.html): 生成HTML格式的错误报告 error_results [r for r in results if r[status] failed] if not error_results: self.logger.info(没有错误需要报告) return html_content f !DOCTYPE html html head titleWhisper批量转录错误报告/title style body {{ font-family: Arial, sans-serif; margin: 40px; }} h1 {{ color: #d32f2f; }} .error {{ background: #ffebee; padding: 15px; margin: 10px 0; border-radius: 5px; }} .file {{ font-weight: bold; color: #1976d2; }} .timestamp {{ color: #757575; font-size: 0.9em; }} /style /head body h1Whisper批量转录错误报告/h1 p生成时间: {datetime.now().strftime(%Y-%m-%d %H:%M:%S)}/p p总错误数: {len(error_results)}/p h2错误详情/h2 for error in error_results: html_content f div classerror div classfile文件: {error[file]}/div div classtimestamp处理时间: {error.get(processing_time, N/A)}秒/div div classmessage错误信息: {error[error]}/div /div html_content /body /html with open(report_file, w, encodingutf-8) as f: f.write(html_content) self.logger.info(f错误报告已生成: {report_file})5. 完整实战示例5.1 最终整合脚本将以上所有功能整合成一个完整的批量转录脚本#!/usr/bin/env python3 Whisper-large-v3批量转录脚本 支持进度监控、错误日志、格式转换等功能 作者: by113小贝 import os import whisper import json import time import logging import argparse from datetime import datetime from tqdm import tqdm from logging.handlers import RotatingFileHandler class AdvancedWhisperTranscriber: def __init__(self, model_sizelarge-v3, devicecuda, log_dirlogs): self.setup_logging(log_dir) self.logger.info(初始化高级Whisper转录器) try: self.logger.info(f正在加载{model_size}模型...) self.model whisper.load_model(model_size, devicedevice) self.logger.info(模型加载完成) except Exception as e: self.logger.error(f模型加载失败: {str(e)}, exc_infoTrue) raise def setup_logging(self, log_dir): 设置日志系统 os.makedirs(log_dir, exist_okTrue) self.logger logging.getLogger(AdvancedWhisperTranscriber) self.logger.setLevel(logging.DEBUG) # 文件handler log_file os.path.join(log_dir, fwhisper_{datetime.now().strftime(%Y%m%d_%H%M%S)}.log) file_handler RotatingFileHandler(log_file, maxBytes10*1024*1024, backupCount5) file_handler.setLevel(logging.DEBUG) # 控制台handler console_handler logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式设置 formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def batch_transcribe(self, input_dir, output_dirtranscripts, extensions[.wav, .mp3, .m4a]): 执行批量转录 # 收集文件 audio_files self.collect_audio_files(input_dir, extensions) if not audio_files: self.logger.warning(未找到音频文件) return {status: no_files} self.logger.info(f开始处理 {len(audio_files)} 个音频文件) results [] success_count 0 # 使用进度条 with tqdm(totallen(audio_files), desc批量转录) as pbar: for audio_file in audio_files: audio_path os.path.join(input_dir, audio_file) pbar.set_description(f处理: {audio_file[:15]}...) start_time time.time() success, result self.transcribe_single_file(audio_path, output_dir) processing_time time.time() - start_time result_data { file: audio_file, status: success if success else failed, processing_time: round(processing_time, 2), result: result if success else None, error: result if not success else None } results.append(result_data) if success: success_count 1 self.logger.info(f成功: {audio_file} ({processing_time:.2f}s)) else: self.logger.error(f失败: {audio_file} - {result}) pbar.update(1) pbar.set_postfix({ 成功: success_count, 失败: len(results) - success_count }) # 生成报告 self.generate_reports(results, output_dir) return { total: len(audio_files), success: success_count, failed: len(audio_files) - success_count, results: results } def transcribe_single_file(self, audio_path, output_dir): 转录单个文件 try: if not os.path.exists(audio_path): return False, f文件不存在: {audio_path} # 执行转录 result self.model.transcribe(audio_path) # 保存结果 os.makedirs(output_dir, exist_okTrue) base_name os.path.splitext(os.path.basename(audio_path))[0] # 文本结果 txt_file os.path.join(output_dir, f{base_name}.txt) with open(txt_file, w, encodingutf-8) as f: f.write(result[text]) # 完整JSON结果 json_file os.path.join(output_dir, f{base_name}.json) with open(json_file, w, encodingutf-8) as f: json.dump(result, f, ensure_asciiFalse, indent2) return True, result[text] except Exception as e: return False, f转录错误: {str(e)} def collect_audio_files(self, input_dir, extensions): 收集音频文件 audio_files [] for ext in extensions: for file in os.listdir(input_dir): if file.lower().endswith(ext): audio_files.append(file) return audio_files def generate_reports(self, results, output_dir): 生成统计报告和错误报告 # 确保输出目录存在 report_dir os.path.join(output_dir, reports) os.makedirs(report_dir, exist_okTrue) # 生成统计报告 self.generate_statistics_report(results, report_dir) # 生成错误报告 self.generate_error_report(results, report_dir) def generate_statistics_report(self, results, report_dir): 生成统计报告 success_files [r for r in results if r[status] success] failed_files [r for r in results if r[status] failed] total_time sum(r[processing_time] for r in results) avg_time total_time / len(results) if results else 0 stats { 处理日期: datetime.now().strftime(%Y-%m-%d %H:%M:%S), 总文件数: len(results), 成功数: len(success_files), 失败数: len(failed_files), 成功率: f{(len(success_files)/len(results)*100):.2f}% if results else 0%, 总处理时间: f{total_time:.2f}秒, 平均处理时间: f{avg_time:.2f}秒/文件, } # 保存为JSON stats_file os.path.join(report_dir, statistics.json) with open(stats_file, w, encodingutf-8) as f: json.dump(stats, f, ensure_asciiFalse, indent2) self.logger.info(f统计报告已生成: {stats_file}) def generate_error_report(self, results, report_dir): 生成错误报告 error_results [r for r in results if r[status] failed] if not error_results: self.logger.info(没有错误需要报告) return error_file os.path.join(report_dir, errors.json) with open(error_file, w, encodingutf-8) as f: json.dump(error_results, f, ensure_asciiFalse, indent2) self.logger.info(f错误报告已生成: {error_file}) def main(): 主函数 parser argparse.ArgumentParser(descriptionWhisper-large-v3批量转录工具) parser.add_argument(input_dir, help输入音频文件目录) parser.add_argument(-o, --output, defaulttranscripts, help输出目录) parser.add_argument(-m, --model, defaultlarge-v3, helpWhisper模型大小) parser.add_argument(--device, defaultcuda, help运行设备 (cuda/cpu)) args parser.parse_args() # 检查输入目录 if not os.path.exists(args.input_dir): print(f错误: 输入目录不存在 {args.input_dir}) return # 创建转录器 try: transcriber AdvancedWhisperTranscriber( model_sizeargs.model, deviceargs.device ) # 执行批量转录 result transcriber.batch_transcribe(args.input_dir, args.output) print(f\n处理完成) print(f总文件: {result[total]}) print(f成功: {result[success]}) print(f失败: {result[failed]}) except Exception as e: print(f程序执行失败: {e}) if __name__ __main__: main()5.2 使用方式保存上述脚本为whisper_batch.py然后通过命令行使用# 基本用法 python whisper_batch.py /path/to/audio/files # 指定输出目录 python whisper_batch.py /path/to/audio/files -o ./my_transcripts # 使用不同模型如果显存不足 python whisper_batch.py /path/to/audio/files -m medium # 使用CPU运行 python whisper_batch.py /path/to/audio/files --device cpu6. 总结与最佳实践通过本文的批量转录解决方案你現在可以高效处理大量音频自动遍历文件夹中的音频文件并进行批量转录实时监控进度使用进度条直观显示处理状态和预估时间完善的错误处理自动记录错误日志生成详细的错误报告多种格式支持支持WAV、MP3、M4A等常见音频格式完整的结果输出同时保存文本结果和包含时间戳的完整JSON结果最佳实践建议分批处理对于大量文件建议分批处理每次100-200个文件避免内存泄漏风险定期清理转录完成后及时清理临时文件和转换后的音频文件监控GPU内存使用nvidia-smi监控GPU内存使用情况确保不会爆显存错误重试机制可以添加错误重试功能对因临时问题导致的失败进行重试结果验证对于重要转录任务建议随机抽查部分结果进行人工验证性能优化提示使用SSD硬盘存储音频文件加快读取速度确保有足够的系统内存避免频繁的磁盘交换对于超长音频考虑先进行分割再转录根据实际需求选择合适的模型大小平衡精度和速度获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。