Flink是为状态流处理而生。然而什么是状态呢?状态可以简单理解为Flink应用中算子处理数据所产生的信息,这些信息将影响后续数据的处理。
状态是大多数复杂流处理程序所必须的一项基础概念,例如:
- 匹配事件流中的pattern,需要将已经接受到的事件存储到状态中
- 每分钟的事件聚合,需要用状态来缓存待聚合事件。
- 通过事件流训练模型,需要用状态存储当前的模型参数。
然而在实际生产环境中,只有支持容错的状态才是有意义的。“支持容错”意味着无论是出现软件还是硬件错误,计算的最终结果总是准确的,不会出现数据丢失或者重复计算。
Flink的容错机制非常强大且受欢迎,它可以最小化软硬件失败对业务的影响,使得对exactly-once语义的支持成为可能。
Flink实现状态容错的核心就是Checkpointing(检查点机制)。Checkpoint是Flink应用的定期状态全局异步快照,通常被存储到持久化存储中(例如:分布式文件系统)。当发生故障时Flink会将最近一个完成的Checkpoint作为起点重启应用。有一些用户的Flink应用状态可达数GB甚至是TB,Checkpoint会非常慢且消耗资源,这也是Flink1.3引入“增量checkpoint”的原因。
在引入增量Checkpoint前,每一个Checkpoint都会包含应用的完整状态,但通常两次相邻的Checkpoint数据差距并不大,所以这样意义并不大,而增量回溯则只会写入相邻Checkpoint的增量数据。
对于大状态应用来讲,增量Checkpoint能够显著提升其性能。早期测试显示,一个具有TB级别状态的应用使用增量Checkpoint后不需要每次写入完整的状态数据,Checkpoint时间从3分钟缩短到30秒。
如何使用增量Checkpoint:
Java:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(filebackend, true));
Scala:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new RocksDBStateBackend(filebackend, true))
默认情况下,Flink只保留一个已完成Checkpoint,可以通过如下参数配置:
state.checkpoints.num-retained
工作原理
Flink增量Checkpoint是以 RocksDB的Checkpoint为基础。RocksDB是一个基于LSM树的KV存储,LSM树会将修改写入memtable(基于内存的buffer)中。对于同一个key的更新将会替代上一个版本的值,当memtable写满后RocksDB会将数据按照key排序压缩后写入到磁盘中,磁盘中的数据是不可变的,被称为 ‘sorted-string-table’(sstable) 。
‘Compaction’作为一个后台任务会将可能存在重复key的sstable文件进行合并,随着时间推移原始sstable被删除,取而代之的是合并后的sstable。
在此基础之上,Flink可以追踪到从上次Checkpoint以来RocksDB创建和删除了哪些sstable文件,又因为sstable是不可变的,Flink以此来获取状态的变化。为了实现这一目标,Flink会先触发RocksDB的flush操作,强制所有的memtable写入到sstable文件中,并在本地目录创建一个硬连接。这个过程对于流水线是同步的,但后续的所有操作都是异步的,不会阻塞数据流。
然后,Flink将所有生成的sstable复制到持久化存储(例如:HDFS,S3)中,给新的checkpoint来引用。Flink不会拷贝已经存在的文件,而是直接重新引用。任何新的Checkpoint都不会引用被删除的sstable,因为删除的sstable是compaction的结果,最终将会被合并后的sstable取代。这就是Flink增量Checkpoint裁剪checkpoint历史的方式。
为了追踪两次checkpoint之间的修改,上传已合并的sstable是一项冗余的步骤。Flink会增量处理这一过程,通常情况下只会增加少量的开销,因此我们认为这是值得的,因为这可以让Flink保存更少的历史checkpoint用于故障恢复。
示例:
举例来讲:有一个operator的subtask使用了keyed state,保留的checkpoint数量设置为2。上图中展示了多次checkpoints时:本地RocksDB目录状态,checkpoint引用的文件,checkpoint完成后shared state registry的文件引用计数。
对于Checkpoint1:本地目录包含两个sstable,作为新文件会被上传到可靠存储中,目录的名称与Checkpoint的名称对应。当Checkpoint完成后,Flink会在shared state registry中创建两个entry,key为operator,subtask以及原始sstable的文件名,并将其count值设置为1。同时registry还会保存key到可靠存储文件路径的映射。
对于Checkpoint2:RocksDB创建了两个新的sstable,同时两个老的sstable依然存在。Flink将两个新的sstable上传到可靠存储,同时引用两个老的sstable,当checkpoint结束时将registry中所有被引用文件的count加1。
对于Checkpoint3:RocksDB compaction将sstable-(1),sstable-(2),sstable-(3)合并成sstable-(1,2,3),并删除原始文件。合并后的文件所清理了重复项,包含的内容也与源文件一致。除了被合并的文件,sstable-(4)依然存在,另外还有一个新生成的sstable-(5)。Flink将sstable-(1,2,3)和sstable-(5)上传至可靠存储,sstable-(4)被Checkpoint2重新引用,count加一。更老的Checkpoint1因为超出指定的retain版本数则会被删除,所有被Checkpoint1引用的文件count也会被减1。
对于Checkpoint4:RocksDB compaction将sstable-(4), sstable-(5),sstable-(6)合并成sstable-(4,5,6)。Flink将新文件上传到可靠存储,并与sstable-(1,2,3)一同引用,sstable-(1,2,3)和sstable-(4,5,6)的引用count加1,然后删除超出保留数量的Checkpoint2。因为sstable-(1), sstable-(2), sstable-(3)的引用count变为0,Flink会从可靠存储中删除这些文件。
竞争条件和并发Checkpoint,
Flink可以同时执行多个checkpoint,有时候上一次checkpoint还没有结束新的checkpoint已经开始,因此需要考虑使用哪一个历史checkpoint作为新checkpoint的增量数据的basis。Flink只会引用已经被checkpoint coordinator确认的checkpoint,所以不会在无意间引用到已经被删除的文件。
Checkpoint恢复和性能考量
如果你打开了增量checkpoint,故障恢复就不再需要更多的配置。一旦故障发生,Flink的JobManager会告诉所有的task从最近完成的checkpoint进行恢复,无论这个检查点是增量的还是全量的。每一个TaskManager会从持久化存储中下载各自的状态。
尽管这一特性对大状态的checkpoint时间有实质性缩短,还是有一些权衡需要考虑。总体上来讲checkpoint的时间缩短了但也导致恢复时间更长了(取决于状态大小)。如果集群故障频发,且TaskManager需要读取多个checkpoint,则恢复速度比非增量checkpoint要慢。同时你还无法删除被新checkpoint引用的历史checkpoint,这会导致checkpoint之间的差异会随着时间无限增大。你需要更大的分布式存储来保存和更高的带宽来读取checkpoint。
这里有一些策略用来对易用性和性能进行平衡,建议查看Flink官方文档。
原文地址:https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html