天津平台网站建设报价,网站布局方案,如何做网站模特,表格网站源码下面提供使用Spark读取目录文件、判断已处理文件并逐个输出内容的PySpark和Java示例。示例采用简单的持久化记录#xff08;文件列表#xff09;来跟踪已处理文件#xff0c;并演示了核心逻辑。思路说明从指定目录读取所有文件#xff08;使用 wholeTextFiles#xff0c;每…下面提供使用Spark读取目录文件、判断已处理文件并逐个输出内容的PySpark和Java示例。示例采用简单的持久化记录文件列表来跟踪已处理文件并演示了核心逻辑。思路说明从指定目录读取所有文件使用wholeTextFiles每个文件作为一个键值对键为文件路径值为文件内容。从记录文件中加载已处理文件路径集合。过滤出未处理过的文件。逐个处理新文件此处仅打印文件名和内容可替换为实际业务逻辑。将本次处理的文件路径追加到记录文件中便于下次运行时跳过。注意wholeTextFiles适用于小文件场景会将整个文件内容加载到内存。对于大文件应改用其他方式如先列出文件列表再逐个使用textFile读取本例为简化采用前者。1. PySpark 版本from pyspark import SparkContext, SparkConf import os import sys def process_new_files(directory_path, processed_files_path): conf SparkConf().setAppName(ProcessNewFiles-PySpark) sc SparkContext(confconf) # 读取已处理文件列表 processed set() if os.path.exists(processed_files_path): with open(processed_files_path, r) as f: processed set(line.strip() for line in f if line.strip()) # 读取目录下所有文件 (key: 文件绝对路径, value: 文件内容) files_rdd sc.wholeTextFiles(directory_path) # 过滤掉已处理文件 new_files_rdd files_rdd.filter(lambda kv: kv[0] not in processed) # 处理每个新文件打印内容并返回文件路径 def process_file(kv): file_path, content kv print(fProcessing file: {file_path}) print(Content:) print(content) # 这里可加入实际处理逻辑 return file_path # 收集本次处理的所有文件路径 newly_processed new_files_rdd.map(process_file).collect() # 将新处理的文件路径追加到记录文件 if newly_processed: with open(processed_files_path, a) as f: for path in newly_processed: f.write(path \n) sc.stop() if __name__ __main__: if len(sys.argv) ! 3: print(Usage: process_new_files.py directory_path processed_files_path) sys.exit(1) process_new_files(sys.argv[1], sys.argv[2])2. Java 版本import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.io.*; import java.util.HashSet; import java.util.List; import java.util.Set; public class ProcessNewFiles { public static void main(String[] args) { if (args.length ! 2) { System.err.println(Usage: ProcessNewFiles directory_path processed_files_path); System.exit(1); } String directoryPath args[0]; String processedFilesPath args[1]; SparkConf conf new SparkConf().setAppName(ProcessNewFiles-Java); JavaSparkContext sc new JavaSparkContext(conf); // 读取已处理文件列表 SetString processed new HashSet(); try (BufferedReader br new BufferedReader(new FileReader(processedFilesPath))) { String line; while ((line br.readLine()) ! null) { line line.trim(); if (!line.isEmpty()) { processed.add(line); } } } catch (FileNotFoundException e) { // 文件不存在则忽略首次运行 } catch (IOException e) { e.printStackTrace(); sc.close(); return; } // 读取目录下所有文件 JavaPairRDDString, String filesRDD sc.wholeTextFiles(directoryPath); // 过滤已处理文件 JavaPairRDDString, String newFilesRDD filesRDD.filter( (FunctionTuple2String, String, Boolean) kv - !processed.contains(kv._1()) ); // 处理每个新文件打印内容返回文件路径 JavaPairRDDString, String processedInThisRun newFilesRDD.mapToPair( (PairFunctionTuple2String, String, String, String) kv - { String filePath kv._1(); String content kv._2(); System.out.println(Processing file: filePath); System.out.println(Content:); System.out.println(content); return new Tuple2(filePath, ); // 仅保留路径用于记录 } ); // 收集文件路径到Driver ListString newlyProcessed processedInThisRun.keys().collect(); // 追加到记录文件 if (!newlyProcessed.isEmpty()) { try (BufferedWriter bw new BufferedWriter(new FileWriter(processedFilesPath, true))) { for (String path : newlyProcessed) { bw.write(path); bw.newLine(); } } catch (IOException e) { e.printStackTrace(); } } sc.close(); } }3.Scala版本import org.apache.spark.{SparkConf, SparkContext} import java.io.{BufferedWriter, FileWriter, File, PrintWriter} import scala.io.Source object ProcessNewFiles { def main(args: Array[String]): Unit { if (args.length ! 2) { println(Usage: ProcessNewFiles directory_path processed_files_path) System.exit(1) } val directoryPath args(0) val processedFilesPath args(1) // 初始化 Spark val conf new SparkConf().setAppName(ProcessNewFiles-Scala) val sc new SparkContext(conf) // 1. 从记录文件中加载已处理文件路径集合 val processedSet loadProcessedFiles(processedFilesPath) // 2. 读取目录下所有文件生成 (filePath, content) 对 val filesRDD sc.wholeTextFiles(directoryPath) // 3. 过滤掉已处理文件 val newFilesRDD filesRDD.filter { case (path, _) !processedSet.contains(path) } // 4. 处理每个新文件打印文件名和内容可替换为实际逻辑 val newPaths newFilesRDD.map { case (path, content) println(sProcessing file: $path) println(Content:) println(content) // 这里可以加入实际业务处理代码 path // 返回文件路径以便记录 }.collect() // 收集到 Driver // 5. 将本次处理的文件路径追加到记录文件 if (newPaths.nonEmpty) { appendProcessedFiles(processedFilesPath, newPaths) } sc.stop() } /** * 从记录文件中读取已处理的文件路径返回 Set[String] */ def loadProcessedFiles(path: String): Set[String] { val file new File(path) if (file.exists()) { Source.fromFile(file).getLines().map(_.trim).filter(_.nonEmpty).toSet } else { Set.empty[String] } } /** * 将新处理的文件路径追加到记录文件 */ def appendProcessedFiles(path: String, paths: Array[String]): Unit { val writer new BufferedWriter(new FileWriter(path, true)) try { paths.foreach { p writer.write(p) writer.newLine() } } finally { writer.close() } } }4. 准备 Sample 数据并运行创建测试文件在终端执行以下命令创建包含两个小文件的目录并初始化记录文件mkdir -p /tmp/sample_data echo Hello, this is file1 /tmp/sample_data/file1.txt echo Content of file2 /tmp/sample_data/file2.txt touch /tmp/processed_files.txt # 初始为空运行 PySpark 版本假设脚本保存为process_new_files.py执行spark-submit process_new_files.py /tmp/sample_data /tmp/processed_files.txt首次运行会处理 file1.txt 和 file2.txt输出内容并更新/tmp/processed_files.txt。再次运行相同命令将跳过已处理文件不会输出任何内容因为两个文件都已记录。运行 Java 版本编译并打包成 JAR例如process-new-files.jar然后执行spark-submit --class ProcessNewFiles process-new-files.jar /tmp/sample_data /tmp/processed_files.txt行为与 PySpark 版本一致。运行 Scala 版本假设项目结构如下textProcessNewFiles/ ├── build.sbt └── src/main/scala/ProcessNewFiles.scalabuild.sbt内容示例name : ProcessNewFiles version : 1.0 scalaVersion : 2.12.17 // 请根据你的 Spark 版本选择合适的 Scala 版本 libraryDependencies org.apache.spark %% spark-core % 3.5.8 % provided然后执行sbt package生成的 JAR 位于target/scala-2.12/processnewfiles_2.12-1.0.jar。提交运行spark-submit \ --class ProcessNewFiles \ --master local[2] \ target/scala-2.12/processnewfiles_2.12-1.0.jar \ /tmp/sample_data \ /tmp/processed_files.txt5. 注意事项文件路径标识wholeTextFiles返回的键包含file:前缀记录文件中保存的也是完整路径确保一致性。小文件场景上述代码适合文件较小的情况。若文件很大建议先用 Hadoop FileSystem 列出文件名再逐一用textFile读取。并发写入记录文件示例在 Driver 端追加写入适用于单次批处理。若需多应用并发应改用数据库或分布式锁。