入门做网站怀化营销策划网络推广渠道
入门做网站,怀化营销策划网络推广渠道,自己创建网站的注意事项,wordpress访问量阅读量流式计算任务通常需要 7x24 小时长期运行#xff0c;面对网络抖动、机器故障或代码 Bug#xff0c;如何保证任务不挂#xff1f;或者挂了之后能自动恢复且数据不丢、不重#xff1f;这正是 Flink 引以为傲的资本#xff1a;强大的状态管理与基于 Checkpoint 的容错机制。
…流式计算任务通常需要 7x24 小时长期运行面对网络抖动、机器故障或代码 Bug如何保证任务不挂或者挂了之后能自动恢复且数据不丢、不重这正是 Flink 引以为傲的资本强大的状态管理与基于 Checkpoint 的容错机制。本文将带你深入理解 Flink 是如何“记忆”数据的以及它是如何在故障发生时“时光倒流”恢复现场的。一、什么是状态State在流计算中数据是一条条流过的。如果处理一条数据时需要依赖之前的数据例如计算过去一小时的总和、去重、模式匹配那么这些“之前的数据”或“中间计算结果”就是状态。1. 状态的分类Flink 的状态分为两大类Managed State托管状态和Raw State原生状态。我们日常开发 99% 使用的是托管状态由 Flink 运行时自动管理内存、序列化和故障恢复。Managed State 又细分为Keyed State键控状态只能在KeyedStream即keyBy之后上使用。状态是跟 Key 绑定的。Flink 为每个 Key 维护一份独立的状态实例。常用类型ValueState、ListState、MapState、ReducingState、AggregatingState。Operator State算子状态绑定到算子并行实例SubTask与 Key 无关。常用于 Source Connector记录读取的 Offset或 Sink Connector事务控制。常用接口ListState、UnionListState、BroadcastState。二、状态后端State Backends状态存在哪里是内存还是磁盘这由State Backend决定。在 Flink 1.13 之后配置方式简化为以下两种主要模式1. HashMapStateBackend (基于内存)存储位置Java 堆内存Heap。特点读写速度极快对象直接访问无序列化开销。适用场景状态较小例如仅仅是简单的 Count 或去重对延迟极其敏感的场景。缺点受限于 JVM 堆大小容易 GC状态过大时可能 OOM。2. EmbeddedRocksDBStateBackend (基于磁盘)存储位置TaskManager 本地磁盘基于 RocksDB 数据库内存中只作为缓存Off-heap。特点支持超大状态TB 级别不受 JVM 堆限制。适用场景超大窗口、超长周期的聚合、海量 Key 的去重。缺点需要序列化/反序列化读写性能略低于内存版需要调优 RocksDB 参数。3. 配置示例/* by 01022.hk - online tools website : 01022.hk/zh/areacode.html */ StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 设置状态后端为 RocksDB env.setStateBackend(new EmbeddedRocksDBStateBackend()); // 配合 Checkpoint 存储路径存储在本地文件系统 env.getCheckpointConfig().setCheckpointStorage(file:///tmp/flink/checkpoints);三、容错核心CheckpointCheckpoint检查点是 Flink 容错机制的灵魂。它是一个全局一致性快照定期将所有算子的状态持久化到远程存储如 HDFS。1. 核心原理Barrier 对齐Flink 使用Chandy-Lamport 算法的变体。Barrier 注入JobManager 向 Source 发送 Checkpoint Barrier。Barrier 流动Barrier 像普通数据一样在流中传输。对齐Alignment当算子有多个输入流时必须等待所有流的 Barrier 到齐才能进行 Snapshot。这保证了状态的一致性即 Exactly-Once。异步快照算子将状态写入远程存储异步过程不阻塞数据处理。确认完成所有算子都完成快照后JobManager 确认 Checkpoint 成功。2. Checkpoint 配置实战默认情况下 Checkpoint 是关闭的生产环境必须开启。/* by 01022.hk - online tools website : 01022.hk/zh/areacode.html */ // 1. 开启 Checkpoint每 5000ms 触发一次 env.enableCheckpointing(5000); // 2. 设置 Checkpoint 模式默认 EXACTLY_ONCE也可以设为 AT_LEAST_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 3. 设置两次 Checkpoint 之间的最小间隔防止频繁 Checkpoint 导致性能下降 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // 4. Checkpoint 超时时间默认 10分钟 env.getCheckpointConfig().setCheckpointTimeout(60000); // 5. 允许同时进行的 Checkpoint 数量通常设为 1 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 6. 开启作业取消时保留 Checkpoint非常重要否则 Cancel 任务会删除 Checkpoint env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ); // 7. 容忍 Checkpoint 失败次数默认 0即 Checkpoint 失败会导致任务重启 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);四、Savepoint手动的超级 Checkpoint虽然 Checkpoint 和 Savepoint 看起来很像都是快照但它们的定位完全不同特性CheckpointSavepoint触发方式Flink 定时自动触发用户手动命令触发主要目的故障恢复Failover运维操作升级、扩容、迁移存储格式增量存储依赖 StateBackend 优化标准格式全量存储可跨版本生命周期随作业生命周期管理除非设置保留用户自行管理删除需手动常用命令# 触发 Savepoint bin/flink savepoint jobId [targetDirectory] # 从 Savepoint 重启作业 (或者 Checkpoint) bin/flink run -s savepointPath ...五、重启策略Restart Strategies当任务发生故障Exception时Flink 会尝试根据配置的策略自动重启。// 1. 固定延迟重启尝试 3 次每次间隔 10秒 env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, Duration.ofSeconds(10) )); // 2. 失败率重启在 5 分钟内失败超过 3 次则停止否则每次间隔 10秒重启 env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, Duration.ofMinutes(5), Duration.ofSeconds(10) )); // 3. 无重启直接失败 env.setRestartStrategy(RestartStrategies.noRestart());六、总结State是 Flink 实现复杂逻辑的记忆。State Backend决定了记忆存哪里内存快但小RocksDB 大但需序列化。Checkpoint是自动化的定期备份保证故障恢复后的数据一致性。Savepoint是手动的高级备份用于版本升级和应用迁移。掌握了状态与容错你的 Flink 任务才算真正具备了“生产级”的健壮性。下一篇我们将探讨 Flink SQL看看如何用 SQL 解决 80% 的流计算需求。原文来自http://blog.daimajiangxin.com.cn源码地址https://gitee.com/daimajiangxin/flink-learning