Flink端到端Exactly-Once概述

  此文章改编自Piotr Nowojski’s presentation from Flink Forward Berlin 2017,你可以从Flink Forward柏林站找到ppt和演讲录音。

  Apache Flink 1.4.0发布于2017年12月,为流式处理引入了一个重要的里程碑:TwoPhaseCommitSinkFunction(相关jira单),通过将两阶段提交协议的通用逻辑进行抽取,使得使用Flink和包括kafka等一系列source,sink来开发端到端Exactly-Once的应用成为可能。它提供了一层抽象,用户只需要实现几个方法就能够实现端到端Exactly-Once语义。

  如果你只想了解这么多,让我们将您指向Flink文档中相关的位置,关于如何使用TwoPhaseCommitSinkFunction。

  但是如果您想了解更多,在这篇文章中,我们会分享新功能的深入概述,以及Flink幕后所发生的事情。

在这篇文章的剩余部分,我们会:

  • 描述checkpoints在保证exactly-once结果中所扮演的角色。
  • 展示Flink是如何通过两阶段提交与source和sink进行交互来实现端到端exactly-once语义的。
  • 通过一个简单的例子来了解如何使用TwoPhaseCommitSinkFunction来实现一个支持exactly-once的file sink。

Flink应用中的Exactly-once语义

  当我们说“exactly-once语义”时,意味着所有输入event对最终结果的影响只有一次。即使出现软硬件故障,也不能有数据重复或丢失。

  Flink长期以来一直提供内部的exactly-once语义。在过去的几年中,我们深入的介绍了Flink checkpoint的原理,这是Flink支持exactly-once语义的核心能力。这篇Flink文档也提供了这一功能的概述.

  在继续之前,为了理解更广泛的主题,这里快速总结一下checkpoint算法。

Flink中一个checkpoint是一个一致性快照,包含:

  1. 应用当前的状态
  2. 输入数据流当前的读取位置

  Flink 会根据配置定期产出checkpoint,然后将其写入到诸如S3或HDFS这类持久化存储系统中。写入存储的过程可以是异步进行的,也就意味着Flink应用可以再chckpoint过程中继续处理数据。

  当出现软硬件故障并重启是,Flink应用会从最近一次成功完成的checkpoint恢复;Flink会恢复应用的状态并将输入流读取位置后才才开始继续处理数据。这也就意味着Flink会像异常从没发生过一样计算结果。

  在Flink 1.4.0之前,exactly-once语义仅限于Flink应用本身,并不能扩展到Flink结果写入的外部系统。

  但Flink应用通常会关联一系列的sink组件来运行,开发者需要能够在一个组件的上下文之外保持exactly-once语义。

  为了提供端到端的exactly-once语义,即:也适用于外部系统的exactly-once语义,除了Flink应用状态之外,这些外部系统需要提供一种与Flink状态相协调的提交和回滚的方法。

  分布式系统中常用的协调提交和回滚的方式就是两阶段提交协议。下一章节我们会讨论Flink的TwoPhaseCommitSinkFunction如何使用该协议来实现端到端exactly-once语义的。

使用 Apache Flink 的端到端 Exactly Once 应用程序

  我们会简单介绍两阶段提交协议,以及如何在一个读写Kafka的Flink应用中开启端到端Exactly Once语义。Kafka是可以与Flink一同使用的广受欢迎的消息系统,而且在0.11版本中增加了对事务的支持。这就意味着Flink在读写Kafka的应用中拥有支持端到端Exactly Once语义所必需的机制

  Flink 对于支持端到端Exactly Once语义的支持并不局限于Kafka,你可以使用任何提供必须协调机制的source/sink。例如Pravega,一个Dell/EMC开源的流式存储系统,同时也可以通过TwoPhaseCommitSinkFunction来实现端到端Exactly Once。

在要讨论的示例程序中包含:

  • 一个读取Kafka的data source,也就是Flink中的KafkaConsumer
  • 一个窗口聚合
  • 一个雪茹Kafka的data sink,也就是Flink中的KafkaProducer

  Kafka sink要提供exactly-once保证就必须通过事务写入数据,提交时包含两次checkpoint之间的所有数据。

  这保证了失败时写入能够回滚。

  然而,在分布式系统中有多个并发执行的sink任务,简单的提交回滚是不够的,因为所有的组件必须都同意提交或回滚来保证结果的一致性。Flink使用两阶段提交协议和预提交截断来应对这一挑战。

  Checkpoint开始代表着两阶段提交协议的“预提交”节点。当checkpoint开始,Flink JobManager注入一个checkpoint barrier(用来把数据流中的数据分割到当前和下一次checkpoint)到数据流中。

  barrier会通过上游算子传递到下游算子,每到一个算子就会触发该算子的state backend对当前算子状态生成一个快照。

  Data source会存储Kafka的offset,完成后将barrier传递到下一个算子。

  在算子只有内部状态时,这种方式是可以正常工作的。内部状态指的是由Flink state backends存储和管理的所有数据,例如:第二个算子中窗口的sum值。当只有内部状态时,除了更新状态数据,预提交阶段不需要任何额外的操作。Flink会处理这些写入的提交或回滚。

  然而,当有外部状态存在时,这个状态就要被区别化处理了。外部状态通常是以写入诸如Kafka这类外部系统的形式出现。在这种情况下,为了提供exactly-once保证,外部系统需要支持事务用于嵌入两阶段提交协议中。

  例子中的data sink因为需要写入数据到kafka中也存在外部状态。这种情况下,在预提交阶段data sink除了写入状态到state backend外,还必须预提交外部事务。


  当barrier流过所有算子并且触发的snapshot完成后,预提交阶段结束。此时checkpoint成功,checkpoint包含整个应用的状态以及预提交的外部状态。一旦发生错误,我们可以从这个checkpoint重新加载应用。

  下一步就是通知所有算子checkpoint成功完成。这是两阶段提交协议的提交阶段,JobManager会对每个算子发起checkpoint完成回调。Data source和 window算子没有外部状态,所以在commit阶段,这些算子不用执行任何操作。但是Data sink算子有外部状态,所以需要提交外部写入事务。

把所有部分做个汇总:

  • 当所有算子完成预提交后会进行提交。
  • 任意一个预提交失败,所有算子回滚到前一次成功的checkpoint。
  • 当一个预提交成功后,算子和外部系统必须保证提交阶段最终成功。如果一个提交失败了(例如网络间歇性问题),整个应用要失败,并根据用户指定的重启策略进行重启,然后继续尝试提交。这个过程是至关重要的,因为一旦提交最终没有成功,数据会发生丢失。

  因此,我们可以确定所有算子都对checkpoint的最终结果达成一致:数据被提交,或放弃提交并回滚。

在Flink中实现两阶段提交算子

  实现两阶段提交协议所需的逻辑有些复杂,所以Flink把这些公共逻辑抽象成了TwoPhaseCommitSinkFunction类。

  让我们基于一个简单的基于文件的示例来讨论如何扩展TwoPhaseCommitSinkFunction。对于一个exactly-once file sink 我们只需要实现四个方法:

  1. beginTransaction - 用来开始事务,我们在目标文件系统的临时目录中创建一个临时文件。随后我们就可以写入数据到该文件。
  2. preCommit - 在预提交阶段,我们flush数据并关闭临时文件,并不会再写入这个文件。然后我们开启一个新的事务用于后续的checkpoint所拥有数据的写入。
  3. commit - 当提交是,我们原子性移动预提交后的文件到目标目录。请注意这会增加输出数据的可见性延迟。
  4. abort - 在放弃时,我们删除临时文件。

  众所周知,如果出现任何故障,Flink会从最近成功的checkpoint中恢复应用状态。极少数情况下,一个潜在的问题是在预提交已经成功,但提交通知尚未到达算子之前。这种情况下,Flink会恢复算子状态到预提交状态。

  我们必须在状态中存储足够的预提交事务信息,以便于在重启后提交或者放弃事务。在我们的例子中就是临时文件和目标文件夹的路径。

  TwoPhaseCommitSinkFunction考虑了这种情况,从检查点恢复时总会抢先进行一次提交。我们所要做的则是实现一个幂等的提交方式。通常来说这不是问题。在我们的例子中,我们可以识别这种情况:临时文件不再临时目录,而是已经被移动到了目标目录。

  TwoPhaseCommitSinkFunction还考虑了其他的一些边缘情况。可以从Flink文档中了解更多。

总结

文章涉及到的关键点:

  • Flink的checkpoint系统是Flink支持两阶段提交和端到端exactly-once语义的基础。
  • 这种方式的优势在于:Flink不需要像其他系统那样物化传输中的数据,即不需要批处理一样将每个阶段的计算写入硬盘。
  • Flink的TwoPhaseCommitSinkFunction抽象了两阶段提交协议的公共逻辑,使得Flink和支持事务的外部系统可以构建端到端exactly-once应用。
  • 从Flink 1.4.0开始,Pravega和Kafka 0.11 producers提供exactly-once语义。
  • Kafka 0.11 producer基于TwoPhaseCommitSinkFunction构建,相比于at-least-once Kafka producer开销更低。

我们对这项新功能的实现非常激动,也期待未来通过TwoPhaseCommitSinkFunction来支持更多的producer。

原文地址https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注