这篇文章大致介绍Flink如何调度job以及JobManager是如何表示并追踪其状态的。
调度
Flink中的执行资源通过Task Slots来定义。每一个TaskManager拥有一个或多个task slots,每个slot可以执行并行任务中的一个pipeline。一个pipeline包含多个连续任务,例如第n个MapFunction和第n个ReduceFunction实例的组合。注意Flink经常并行执行连续任务:对于流处理程序十分常见,对于批处理程序也经常发生。
下图说明了这一点。设想一个程序有一个data source,一个MapFunction和一个ReduceFunction。Source和MapFunction按照4并行度执行,ReduceFunction按照并行度3执行。一个pipoline有Source-Map-Reduce的顺序组成。在一个有2个TaskManager,每个TaskManager有3个slot的集群,程序会按照如下方式运行:
在内部,Flink通过SlotSharingGroup和CoLocationGroup
定义那些任务可以共享一个slot,相对的哪些任务必须严格放到一个slot中。
JobManager数据结构
在job执行过程中,JobManager持续追踪分布式task来决定何时调度下一个task(或者一批task),并对已完成或失败的task做出应对。
JobManager接收表示数据流所包含的算子(JobVertex)和中间结果(intermediateDataSet)的JobGraph。每一个算子有属性,例如并行度和其所执行的代码。另外,JobGraph有一组运行算子代码所必须的附加库。
JobManager将JobGraph转换成ExecutionGraph。ExecutionGraph是一个并行版本的JobGraph:对于每一个JobVertex,它的每一个子任务都包含一个ExecutionVertex。一个100并行度的算子会有一个JobVertex和100个ExecutionVertex。ExecutionVertex会追踪一个特定子任务的运行状态。一个JobVertex的所有ExectionVertex由ExecutionJobVertex持有,ExecutionJobVertex追踪整个算子的状态。除了vertex,ExecutionGraph还包含IntermediateResult和IntermediateResultPartition。前者追踪IntermediateDataSet的状态,后者追踪每一个分区的状态。
每一个ExecutionGraph有一个job与之关联。这个job的状态指示job当前的执行状态。
一个Flink job首先是创建状态,然后切换到运行状态直到所有工作结束时切换到结束状态。一旦发生故障,job先切换到失败中状态,它会取消所有运行中的task。如果所有job到达最终状态且job无法被重启,则job被切换成失败状态。如果一个job可以被重启,则会进入重启中状态,一旦这个job完成重启就会进入创建状态。
如果用户取消job,则会变成取消中状态。这也需要取消当前正在运行的task。一旦所有运行的task进入最终状态,这个job切换到取消状态。
和结束,取消和失败状态表示全局终端状态并触发全局job清理不同,挂起状态只代表本地终端。本地终端意味着job在对应的JobManger已被终止,但Flink集群的另一个JobManager可以从持久性HA存储中读取并重启该job。所以一个到达挂起状态的job不会被完全清理。
在ExecutionGraph执行期间,每一个并行task通过创建到结束或失败的多个stage。下图揭示了这些stage以及之间可能的转换关系。一个task可能被多次执行(例如在故障恢复过程中)。因此,ExecutionVertex的执行过程被一个Execution追踪。每一个ExecutionVertex有一个当前的Execution和之前所有的Execution。
原文地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/internals/job_scheduling/