多读书多实践,勤思考善领悟

Flink内部原理

本文于1682天之前发表,文中内容可能已经过时。

一、组件堆栈

作为软件堆栈,Flink是一个分层系统。堆栈的不同层构建在彼此之上,并提高它们接受的程序表示的抽象级别:

  • 运行时层中的形式接收节目JobGraph。JobGraph是一个通用的并行数据流,具有消耗和生成数据流的任意任务。
  • 二者的数据流中的API数据集API生成通过单独的编译过程JobGraphs。DataSet API使用优化器来确定程序的最佳计划,而DataStream API使用流构建器。
  • JobGraph根据Flink中提供的各种部署选项执行(例如,本地,远程,YARN等)
  • 与Flink捆绑在一起的库和API生成DataSet或DataStream API程序。这些是用于逻辑表的查询的表,用于机器学习的FlinkML和用于图形处理的Gelly。

Apache Flink:Stack

二、数据流容错

描述Flink的流数据流容错机制。

介绍

Apache Flink提供容错机制,以持续恢复数据流应用程序的状态。该机制确保即使存在故障,程序的状态最终也只能反映数据流中的每条记录一次。请注意,有一个开关保证降级**至少一次 (如下所述)。

容错机制连续地绘制分布式流数据流的SNAPSHOT。对于状态较小的流应用程序,这些SNAPSHOT非常轻量级,可以经常绘制,而不会对性能产生太大影响。流应用程序的状态存储在可配置的位置(例如主节点或HDFS)。

如果程序失败(由于机器,网络或软件故障),Flink将停止分布式流数据流。然后,系统重新启动算子并将其重置为最新的成功检查点。输入流将重置为状态SNAPSHOT的点。作为重新启动的并行数据流的一部分处理的任何记录都保证不会成为先前检查点状态的一部分。

注意:默认情况下,禁用检查点。

注意:要使此机制实现其完全保证,数据流源(例如消息队列或代理)需要能够将流回滚到定义的最近点。Apache Kafka具有这种能力,Flink与Kafka的连接器利用了这种能力。

注意:由于Flink的检查点是通过分布式SNAPSHOT实现的,因此我们可以互换使用SNAPSHOT检查点

检查点

Flink的容错机制的核心部分是绘制分布式数据流和算子状态的一致SNAPSHOT。这些SNAPSHOT充当一致的检查点,系统可以在发生故障时退回。Flink用于绘制这些SNAPSHOT的机制在“ 分布式数据流的轻量级异步SNAPSHOT ”中进行了描述。它受到分布式SNAPSHOT的标准Chandy-Lamport算法的启发,专门针对Flink的执行模型而定制。

障碍

Flink分布式SNAPSHOT的核心数据元是流障碍。这些障碍被注入数据流并与记录一起作为数据流的一部分流动。障碍永远不会超过记录,流量严格符合要求。屏障将数据流中的记录分为进入当前SNAPSHOT的记录集和进入下一个SNAPSHOT的记录。每个屏障都带有SNAPSHOT的ID,该SNAPSHOT的记录在其前面推送。障碍不会中断流的流动,因此非常轻。来自不同SNAPSHOT的多个障碍可以同时在流中,这意味着可以同时发生各种SNAPSHOT。

数据流中的检查点障碍

流障碍被注入流源的并行数据流中。注入SNAPSHOTn的障碍(我们称之为S n)的点是源流中SNAPSHOT覆盖数据的位置。例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。该位置S n被报告给检查点协调员(Flink的JobManager)。

然后障碍物向下游流动。当中间算子从其所有输入流中收到SNAPSHOTn的屏障时,它会为其所有输出流中的SNAPSHOTn发出屏障。一旦接收器算子(流式DAG的末端)从其所有输入流接收到障碍n,它就向SNAPSHOTn确认检查点协调器。在所有接收器确认SNAPSHOT之后,它被认为已完成。

一旦完成SNAPSHOTn,作业将永远不再向源请求来自S n之前的记录,因为此时这些记录(及其后代记录)将通过整个数据流拓扑。

在具有多个输入的 算子处对齐数据流

接收多个输入流的 算子需要在SNAPSHOT屏障上对齐输入流。上图说明了这一点:

  • 一旦算子从输入流接收到SNAPSHOT屏障n,它就不能处理来自该流的任何其他记录,直到它从其他输入接收到屏障n为止。否则,它会混合属于SNAPSHOTn的记录和属于SNAPSHOTn + 1的记录
  • 报告障碍n的流暂时被搁置。从这些流接收的记录不会被处理,而是放入输入缓冲区。
  • 一旦最后一个流接收到屏障n,算子就会发出所有挂起的传出记录,然后自己发出SNAPSHOTn个屏障。
  • 之后,它恢复处理来自所有输入流的记录,在处理来自流的记录之前处理来自输入缓冲区的记录。

状态

当 算子包含任何形式的状态时,此状态也必须是SNAPSHOT的一部分。算子状态有不同的形式:

  • 用户定义的状态:这是由转换函数(如map()filter())直接创建和修改的状态。
  • 系统状态:此状态是指作为 算子计算一部分的数据缓冲区。此状态的典型示例是窗口缓冲区,系统在其中收集(和聚合)窗口记录,直到窗口被评估和逐出。

算子在他们从输入流接收到所有SNAPSHOT障碍时,以及在向其输出流发出障碍之前对其状态进行SNAPSHOT。此时,将根据障碍之前的记录对状态进行所有更新,并且在应用障碍之后不依赖于记录的更新。由于SNAPSHOT的状态可能很大,因此它存储在可配置的状态后台中。默认情况下,这是JobManager的内存,但对于生产使用,应配置分布式可靠存储(例如HDFS)。在存储状态之后,算子确认检查点,将SNAPSHOT屏障发送到输出流中,然后继续。

生成的SNAPSHOT现在包含:

  • 对于每个并行流数据源,启动SNAPSHOT时流中的偏移/位置
  • 对于每个 算子,指向作为SNAPSHOT的一部分存储的状态的指针

检查点机制的例证

完全一次与至少一次

对齐步骤可以增加流式传输程序的等待时间。通常,这种额外的延迟大约为几毫秒,但我们已经看到一些异常值的延迟显着增加的情况。对于要求所有记录始终具有超低延迟(几毫秒)的应用程序,Flink可以在检查点期间跳过流对齐。一旦算子看到每个输入的检查点障碍,仍然会绘制检查点SNAPSHOT。

当跳过对齐时,即使在检查点n的某些检查点障碍到达之后,算子仍继续处理所有输入。这样,算子还可以在获取检查点n的状态SNAPSHOT之前处理属于检查点n + 1的数据元。在还原时,这些记录将作为重复记录出现,因为它们都包含在检查点n的状态SNAPSHOT中,并将在检查点n之后作为数据的一部分进行重放。

注意:对齐仅适用于具有多个前驱(连接)的 算子以及具有多个发送方的 算子(在流重新分区/随机播放之后)。正因为如此,数据流只有尴尬的并行流 算子操作(map()flatMap()filter(),…)实际上给正好一次保证了即使在至少一次模式。

异步状态SNAPSHOT

注意,上述机制意味着算子在将状态的SNAPSHOT存储在状态后台时停止处理输入记录。每次拍摄SNAPSHOT时,此同步状态SNAPSHOT都会引入延迟。

可以让算子在存储状态SNAPSHOT时继续处理,有效地让状态SNAPSHOT在后台异步发生。为此,算子必须能够生成一个状态对象,该状态对象应以某种方式存储,以便对算子状态的进一步修改不会影响该状态对象。例如,诸如RocksDB中使用的写时复制数据结构具有这种行为。

在接收到输入的检查点障碍后,算子启动其状态的异步SNAPSHOT复制。它立即释放其输出的障碍,并继续进行常规流处理。后台复制过程完成后,它会向检查点协调员(JobManager)确认检查点。检查点现在仅在所有接收器都已收到障碍并且所有有状态算子已确认其完成备份(可能在障碍物到达接收器之后)之后才完成。

复苏

在这种机制下的恢复是直截了当的:当失败时,Flink选择最新完成的检查点k。然后,系统重新部署整个分布式数据流,并为每个算子提供作为检查点k的一部分进行SNAPSHOT的状态。设置源以开始从位置S k读取流。例如,在Apache Kafka中,这意味着告诉消费者从偏移量S k开始提取。

如果状态以递增方式SNAPSHOT,则算子从最新完整SNAPSHOT的状态开始,然后对该状态应用一系列增量SNAPSHOT更新。

算子SNAPSHOT实施

在执行算子SNAPSHOT时,有两部分:同步异步部分。

算子和状态后台将其SNAPSHOT作为Java提供FutureTask。该任务包含完成同步部分且异步部分处于挂起状态的状态。然后,异步部分由该检查点的后台线程执行。

检查点纯粹同步返回已经完成的算子FutureTask。如果需要执行异步 算子操作,则以该run()方法执行FutureTask

任务是可取消的,因此可以释放流和其他资源消耗句柄。

三、工作和调度

简要介绍Flink如何调度作业及其如何表示和跟踪JobManager上的作业状态。

调度

Flink中的执行资源通过任务槽定义。每个TaskManager都有一个或多个任务槽,每个槽都可以运行一个并行任务管道。流水线由多个连续的任务,如在 第n一MapFunction的连同并行实例第n一ReduceFunction的并行实例。请注意,Flink经常同时执行连续任务:对于流程序,无论如何都会发生,但对于批处理程序,它经常发生。

下图说明了这一点。考虑一个带有数据源,MapFunctionReduceFunction的程序。源和MapFunction以4的并行度执行,而ReduceFunction以3的并行度执行。管道由序列Source - Map - Reduce组成。在具有2个TaskManagers且每个具有3个插槽的群集上,程序将按如下所述执行。

将任务管道分配给插槽

在内部,Flink限定通过SlotSharingGroupCoLocationGroup 哪些任务可以共享的狭槽(许可),分别哪些任务必须严格放置到相同的时隙。

JobManager数据结构

在作业执行期间,JobManager会跟踪分布式任务,决定何时安排下一个任务(或一组任务),并对已完成的任务或执行失败做出反应。

JobManager接收JobGraph,它是由 算子(JobVertex)和中间结果(IntermediateDataSet)组成的数据流的表示。每个 算子都具有属性,例如并行性和它执行的代码。此外,JobGraph还有一组附加库,这些库是执行算子代码所必需的。

JobManager将JobGraph转换为ExecutionGraph。ExecutionGraph是JobGraph的并行版本:对于每个JobVertex,它包含每个并行子任务的ExecutionVertex。并行度为100的 算子将具有一个JobVertex和100个ExecutionVertices。ExecutionVertex跟踪特定子任务的执行状态。来自一个JobVertex所有ExecutionVertices都保存在 ExecutionJobVertex中,它跟踪整个算子的状态。除了顶点之外,ExecutionGraph还包含IntermediateResultIntermediateResultPartition。前者跟踪IntermediateDataSet的状态,后者是每个分区的状态。

JobGraph和ExecutionGraph

每个ExecutionGraph都有一个与之关联的作业状态。此作业状态指示作业执行的当前状态。

Flink作业首先处于创建状态,然后切换到运行,并在完成所有工作后切换到已完成。如果出现故障,作业将首先切换为取消所有正在运行的任务的失败。如果所有作业顶点都已达到最终状态且作业无法重新启动,则作业将转换为失败。如果可以重新启动作业,则它将进入重新启动状态。作业完全重新启动后,将达到创建状态。

如果用户取消作业,它将进入取消状态。这还需要取消所有当前正在运行的任务。一旦所有正在运行的任务都达到最终状态,作业将转换为已取消的状态。

完成取消失败的状态不同,它表示全局终端状态,因此触发清理作业,暂停状态仅在本地终端。本地终端意味着作业的执行已在相应的JobManager上终止,但Flink集群的另一个JobManager可以从持久性HA存储中检索作业并重新启动它。因此,到达暂停状态的作业将不会被完全清除。

Flink工作的状态和转型

在执行ExecutionGraph期间,每个并行任务都经历多个阶段,从创建完成失败。下图说明了它们之间的状态和可能的转换。可以多次执行任务(例如,在故障恢复过程中)。因此,在Execution中跟踪ExecutionVertex执行。每个ExecutionVertex都有一个当前的Execution和先前的Executions。

四、任务生命周期

Flink中的任务是执行的基本单位。它是执行 算子的每个并行实例的位置。例如,并行度为5的 算子将使每个实例由单独的任务执行。

StreamTask是Flink流处理中所有不同任务子类型的基础。本文档StreamTask介绍了生命周期中的不同阶段,并描述了代表这些阶段的主要方法。

简而言之,算子生命周期

因为任务是执行 算子的并行实例的实体,所以它的生命周期与 算子的生命周期紧密集成。因此,我们将简要提及代表算子生命周期的基本方法,然后再深入了解算子的生命周期StreamTask。下面按照调用每个方法的顺序列出了该列表。假设 算子可以具有用户定义的函数(UDF),则在每个 算子方法下面,我们还会在它调用的UDF的生命周期中呈现(缩进)方法。如果 算子扩展了这些方法,则这些方法可用AbstractUdfStreamOperator,这是执行UDF的所有 算子的基本类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// initialization phase
OPERATOR::setup
UDF::setRuntimeContext
OPERATOR::initializeState
OPERATOR::open
UDF::open

// processing phase (called on every element/watermark)
OPERATOR::processElement
UDF::run
OPERATOR::processWatermark

// checkpointing phase (called asynchronously on every checkpoint)
OPERATOR::snapshotState

// termination phase
OPERATOR::close
UDF::close
OPERATOR::dispose

简而言之,setup()调用它来初始化一些特定于算子的机制,例如它RuntimeContext及其度量集合数据结构。在此之后,initializeState()为算子提供其初始状态,并且该 open()方法执行任何特定于算子的初始化,例如在该情况下打开用户定义的函数AbstractUdfStreamOperator

注意initializeState()同时包含它的初始执行过程中初始化 算子操作者的状态的逻辑(例如寄存器任何键入的状态),并且还从在发生故障后的检查点检索其状态的逻辑。有关此内容的更多信息,请参见本页的其余部

现在一切都已设置,算子已准备好处理传入的数据。传入数据元可以是以下之一:输入数据元,水印和检查点障碍。它们中的每一个都有一个特殊的数据元来处理它。数据元由processElement()方法处理,水印由processWatermark()和检查点障碍触发一个检查点,该检查点调用(异步)snapshotState()方法,我们在下面描述。对于每个传入数据元,根据其类型调用上述方法之一。请注意,processElement()它也是调用UDF逻辑的位置,例如map()您的方法MapFunction

最后,在算子正常,无故障终止的情况下(例如,如果流是有限的并且到达其结束),close()则调用该方法以执行算子逻辑所需的任何最终副本记录 算子操作(例如, 关闭任何连接)或者在算子执行期间打开的I / O流),dispose()之后调用它以释放算子持有的任何资源(例如算子数据所持有的本机内存)。

在由于故障或由于手动取消而终止的情况下,执行直接跳转到dispose() 并且跳过算子在故障发生时所处的阶段与之间的任何中间阶段dispose()

检查点:snapshotState()只要收到检查点障碍,就会调用算子的方法与上述其他方法异步。检查点在处理阶段执行,在算子打开之后和关闭之前执行。此方法的职责是将算子的当前状态存储到指定的状态后台,当作业在失败后恢复执行时将从该后台检索该状态。

任务生命周期

在简要介绍了算子的主要阶段之后,本节将更详细地描述任务如何在群集上执行期间调用相应的方法。这里描述的阶段的顺序主要包括在该类的invoke()方法中StreamTask。本文档的其余部分分为两个小节,一个描述在任务的常规无故障执行期间的阶段,以及(更短的)一个描述在任务被取消时遵循的不同序列,手动或由于某些其他原因,例如执行期间抛出的异常。

正常执行

执行任务直到完成而不被中断的步骤如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
close-operators
dispose-operators
task-specific-cleanup
common-cleanup

如上所示,在恢复任务配置并初始化一些重要的运行时参数之后,该任务的第一步是检索其初始的任务范围状态。这是在setInitialState(),这在两种情况下尤其重要:

  1. 当任务从故障中恢复并从上一个成功检查点重新启动时
  2. 从保存点恢复时。

如果是第一次执行任务,则初始任务状态为空。

恢复任何初始状态后,任务进入其invoke()方法。在那里,它首先通过调用setup()它们中的每一个的方法来初始化参与本地计算的 算子,然后通过调用本地init()方法来执行其任务特定的初始化。通过特定的任务,我们的意思是根据任务(类型SourceTaskOneInputStreamTaskTwoInputStreamTask等),这个步骤可能会有所不同,但在任何情况下,这里是必要的任务范围内获取资源。作为示例,OneInputStreamTask表示期望具有单个输入流的任务,将连接初始化为与本地任务相关的输入流的不同分区的位置。

获得必要的资源后,不同的算子和用户定义的函数就可以从上面检索的任务范围状态获取其各自的状态。这是在initializeState()方法中完成的,该方法调用initializeState()每个单独的 算子。每个有状态 算子都应该覆盖此方法,并且应该包含状态初始化逻辑,这两者都是第一次执行作业,也适用于任务从故障中恢复或使用保存点时的情况。

现在任务中的所有算子都已初始化,open()每个单独的算子的openAllOperators()方法都由该方法调用StreamTask。此方法执行所有 算子操作初始化,例如使用计时器服务注册任何检索到的计时器。单个任务可能正在执行多个 算子,其中一个 算子消耗其前任的输出。在这种情况下,open()从最后一个 算子(其输出也是任务本身的输出的 算子)调用该方法到第一个 算子。这样做是为了当第一个算子开始处理任务的输入时,所有下游算子都准备好接收其输出。

注意任务中的连续算子从最后一个到第一个打开。

现在任务可以恢复执行,算子可以开始处理新的输入数据。这run() 是调用特定于任务的方法的位置。此方法将一直运行,直到没有更多输入数据(有限流),或任务被取消(手动或不手动)。这是 调用 算子特定processElement()processWatermark()方法的位置。

在运行直到完成的情况下,没有更多的输入数据要处理,在退出该run() 方法之后,任务进入其关闭过程。最初,定时器服务停止注册任何新的定时器(例如,从正在执行的触发定时器),清除所有尚未启动的定时器,并等待当前正在执行的定时器的完成。然后closeAllOperators()尝试通过调用close()每个 算子的方法来优雅地关闭计算中涉及的运算符。然后,刷新任何缓冲的输出数据,以便下游任务可以处理它们,最后任务尝试通过调用它来清除算子持有的所有资源。 dispose()每个人的方法。打开不同的算子时,我们提到订单是从最后一个到第一个。结束以相反的方式发生,从头到尾。

注意任务中的连续算子从第一个到最后一个关闭。

最后,当所有算子都已关闭并释放所有资源时,任务会关闭其计时器服务,执行特定于任务的清理,例如清除其所有内部缓冲区,然后执行其通用任务清理,其中包括关闭所有 算子操作输出通道和清理任何输出缓冲区。

检查点:以前我们看到过,在initializeState()从故障中恢复的过程中,任务及其所有算子和函数检索在故障之前的最后一个成功检查点期间持久保存到稳定存储的状态。Flink中的检查点是根据用户指定的间隔定期执行的,并且由与主任务线程不同的线程执行。这就是为什么它们不包含在任务生命周期的主要阶段中。简而言之,调用的特殊数据元CheckpointBarriers由输入数据流中的作业的源任务定期注入,并随实际数据从源传递到接收器。源任务在运行模式后注入这些障碍,并假设为CheckpointCoordinator也在运行。每当任务接收到这样的障碍时,它就会调度由检查点线程执行的任务,该线程调用 snapshotState()任务中的 算子。在执行检查点时,任务仍然可以接收输入数据,但是数据被缓冲并且仅在检查点成功完成后处理并向下游发射。

执行中断

在前面的部分中,我们描述了一直运行到完成的任务的生命周期。如果任务在任何时候被取消,则正常执行被中断,从那一点开始执行的唯一 算子操作是定时器服务关闭,特定于任务的清理,算子的处理以及一般任务清理,如如上所述。

五、文件系统

Flink通过org.apache.flink.core.fs.FileSystem类有自己的文件系统抽象。这种抽象提供了一组通用的 算子操作,并且在各种类型的文件系统实现中提供了最小的保证。

FileSystem可用 算子操作的设置是非常有限的,为了支持多种文件系统。例如,不支持附加或更改现有文件。

文件系统是由识别的文件系统格式,如file://hdfs://等。

实现

Flink使用以下文件系统方案直接实现文件系统:

  • file,表示机器的本地文件系统。

其他文件系统类型由桥接到Apache Hadoop支持的文件系统套件的实现访问 。以下是不完整的示例列表:

  • hdfs:Hadoop分布式文件系统
  • s3s3ns3a:亚马逊S3文件系统
  • gcs:Google云端存储
  • maprfs:MapR分布式文件系统

如果在类路径中找到Hadoop文件系统类并找到有效的Hadoop配置,Flink将透明地加载Hadoop的文件系统。默认情况下,它在类路径中查找Hadoop配置。或者,可以通过配置条目指定自定义位置fs.hdfs.hadoopconf

坚持保证

这些FileSystem及其FsDataOutputStream实例用于持久存储数据,既用于应用程序的结果,也用于容错和恢复。因此,必须明确定义这些流的持久性语义。

持久性保证的定义

如果满足两个要求,则写入输出流的数据被认为是持久的:

  1. 可见性要求:必须保证能够访问该文件的所有其他进程,计算机,虚拟机,容器等在给定绝对文件路径时始终看到数据。此要求类似于 POSIX定义的close-to-open语义,但仅限于文件本身(通过其绝对路径)。
  2. 持久性要求:必须满足文件系统的特定持久性/持久性要求。这些特定于特定文件系统。例如,{@link LocalFileSystem}不为硬件和 算子操作系统的崩溃提供任何持久性保证,而复制的分布式文件系统(如HDFS)通常在出现n个并发节点故障时保证持久性,其中n是复制因子。

不需要更新文件的父目录(以便在列出目录内容时显示文件),以使文件流中的数据被认为是持久的。对于目录内容的更新最终只是一致的文件系统来说,这种放松很重要。

FSDataOutputStream必须保证数据的持久性,一旦调用写入的字节 FSDataOutputStream.close()回报。

例子

  • 对于容错的分布式文件系统,一旦数据被文件系统接收和确认,数据就被认为是持久的,通常是通过复制到法定数量的机器(持久性要求)。此外,绝对文件路径必须对可能访问该文件的所有其他计算机可见(可见性要求)。

    数据是否已达到存储节点上的非易失性存储取决于特定文件系统的特定保证。

    对文件的父目录的元数据更新不需要达到一致状态。允许某些机器在列出父目录的内容时看到该文件而其他机器没有,只要在所有节点上都可以通过其绝对路径访问该文件。

  • 一个本地文件系统必须支持POSIX 贴近开放语义。由于本地文件系统没有任何容错保证,因此不存在进一步的要求。

    以上特别暗示,当从本地文件系统的角度考虑持久性时,数据可能仍然在OS高速缓存中。导致 算子操作系统缓存丢失数据的崩溃对本地计算机来说是致命的,并且不受Flink定义的本地文件系统保证的影响。

    这意味着仅保证可以从本地计算机的故障中恢复仅写入本地文件系统的计算结果,检查点和保存点,从而使本地文件系统不适合生产设置。

更新文件内容

许多文件系统根本不支持覆盖现有文件的内容,或者在这种情况下不支持更新内容的一致可见性。因此,Flink的FileSystem不支持附加到现有文件,也不支持在输出流中搜索,以便可以在同一文件中更改以前写入的数据。

覆盖文件

通常可以覆盖文件。通过删除文件并创建新文件来覆盖文件。但是,某些文件系统无法使有权访问该文件的所有各方同步显示该更改。例如,Amazon S3仅保证文件替换可见性的最终一致性:某些计算机可能会看到旧文件,某些计算机可能会看到新文件。

为了避免这些一致性问题,Flink中的故障/恢复机制的实现严格避免多次写入同一文件路径。

线程安全

实现FileSystem必须是线程安全的:相同的实例FileSystem经常在Flink中的多个线程之间共享,并且必须能够同时创建输入/输出流和列表文件元数据。

这些FSDataOutputStreamFSDataOutputStream实现严格地说不是线程安全的。在读取或写入 算子操作之间的线程之间也不应传递流的实例,因为不能保证跨线程的 算子操作的可见性(许多 算子操作不会创建内存防护)。