升级应用程序和Flink版本
Flink DataStream程序通常设计为长时间运行,例如数周,数月甚至数年。与所有长期运行的服务一样,需要维护Flink流应用程序,包括修复错误,实施改进或将应用程序迁移到更高版本的Flink集群。
本文档介绍如何更新Flink流应用程序以及如何将正在运行的流应用程序迁移到其他Flink群集。
重新启动流应用程序
升级流应用程序或将应用程序迁移到其他群集的 算子操作系列基于Flink的Savepoint函数。保存点是特定时间点应用程序状态的一致SNAPSHOT。
有两种方法可以从正在运行的流应用程序中获取保存点。
- 采取保存点并继续处理。
> ./bin/flink savepoint <jobID> [pathToSavepoint]
建议定期获取保存点,以便能够从之前的某个时间点重新启动应用程序。
- 获取保存点并将应用程序作为单个 算子操作停止。
> ./bin/flink cancel -s [pathToSavepoint] <jobID>
这意味着应用程序在保存点完成后立即取消,即保存点后没有其他检查点。
给定从应用程序获取的保存点,可以从该保存点启动相同或兼容的应用程序(请参阅下面的“ 应用程序状态兼容性”部分)。从保存点启动应用程序意味着初始化其 算子的状态,并在保存点中保存 算子状态。这是通过使用保存点启动应用程序来完成的。
> ./bin/flink run -d -s [pathToSavepoint] ~/application.jar
启动应用程序的算子在获取保存点时使用原始应用程序的算子状态(即从中获取保存点的应用程序)进行初始化。启动的应用程序从这一点开始继续处理。
注意:即使Flink始终恢复应用程序的状态,它也无法恢复对外部系统的写入。如果从未停止应用程序的保存点恢复,则可能会出现问题。在这种情况下,应用程序可能在获取保存点后发出数据。重新启动的应用程序可能(取决于您是否更改了应用程序逻辑)再次发出相同的数据。根据SinkFunction
存储系统的Distinct,此行为的确切影响可能会有很大差异。如果对像Cassandra这样的键值存储进行幂等写入 算子操作,则发出两次的数据可能是正常的,但如果附加到像Kafka这样的持久日志中则会出现问题。无论如何,您应该仔细检查并测试重新启动的应用程序的行为。
应用状态兼容性
在升级应用程序以修复错误或改进应用程序时,通常的目标是在保存其状态的同时替换正在运行的应用程序的应用程序逻辑。我们通过从原始应用程序中获取的保存点启动升级的应用程序来完成此 算子操作。但是,这仅在两个应用程序都是状态兼容的情况下才有效,这意味着升级后的应用程序的 算子能够使用原始应用程序的 算子的状态初始化其状态。
在本节中,我们将讨论如何修改应用程序以保持状态兼容。
匹配算子状态
从保存点重新启动应用程序时,Flink会将保存点中存储的 算子状态与已启动应用程序的有状态 算子进行匹配。匹配基于算子ID完成,算子ID也存储在保存点中。每个 算子都有一个默认ID,该ID是从 算子在应用程序 算子拓扑中的位置派生而来的。因此,可以始终从其自己的保存点之一重新启动未修改的应用程序。但是,如果修改了应用程序,则 算子的默认ID可能会更改。因此,如果已明确指定了 算子ID,则只能从保存点启动已修改的应用程序。为 算子分配ID非常简单,使用以下uid(String)
方法完成:
val mappedEvents: DataStream[(Int, Long)] = events
.map(new MyStatefulMapFunc()).uid("mapper-1")
注意:由于存储在保存点中的 算子ID和要启动的应用程序中的 算子的ID必须相等,因此强烈建议为将来可能升级的应用程序的所有 算子分配唯一的ID。此建议适用于所有 算子,即具有和不具有显式声明的 算子状态的 算子,因为某些 算子具有用户不可见的内部状态。升级没有分配算子ID的应用程序要困难得多,并且可能只能通过使用该setUidHash()
方法的低级解决方法来实现。
重要提示:从1.3.x开始,这也适用于属于链的算子。
默认情况下,存储在保存点中的所有状态必须与启动应用程序的 算子匹配。但是,用户可以明确同意跳过(从而丢弃)从保存点启动应用程序时无法与算子匹配的状态。在保存点中找不到状态的有状态 算子将使用其默认状态进行初始化。
有状态 算子和用户函数
升级应用程序时,可以通过一个限制自由修改用户函数和算子。无法更改 算子状态的数据类型。这很重要,因为从保存点开始的状态在加载到 算子之前(当前)不能转换为不同的数据类型。因此,在升级应用程序时更改算子状态的数据类型会中断应用程序状态一致性,并阻止升级的应用程序从保存点重新启动。
算子状态可以是用户定义的,也可以是内部的。
用户定义的 算子状态:在具有用户定义的 算子状态的函数中,状态的类型由用户显式定义。虽然无法更改 算子状态的数据类型,但是克服此限制的解决方法可以是定义具有不同数据类型的第二个状态,并实现将状态从原始状态迁移到新状态的逻辑。这种方法需要良好的迁移策略和对Keys分区状态行为的充分理解。
内部 算子状态:窗口或连接 算子等 算子保持不向用户公开的内部 算子状态。对于这些 算子,内部状态的数据类型取决于 算子的输入或输出类型。因此,更改相应的输入或输出类型会中断应用程序状态一致性并阻止升级。下表列出了具有内部状态的 算子,并显示了状态数据类型与其输入和输出类型的关系。对于应用于被Key化的数据流的 算子,键类型(KEY)也始终是状态数据类型的一部分。
算子 | 内部算子状态的数据类型 |
---|---|
ReduceFunction [IOT] | 物联网(输入和输出类型)[,KEY] |
FoldFunction [IT,OT] | OT(输出类型)[,KEY] |
WindowFunction [IT,OT,KEY,WINDOW] | IT(输入类型),KEY |
AllWindowFunction [IT,OT,WINDOW] | IT(输入类型) |
JoinFunction [IT1,IT2,OT] | IT1,IT2(类型1和2.输入),KEY |
CoGroupFunction [IT1,IT2,OT] | IT1,IT2(类型1和2.输入),KEY |
内置聚合(sum,min,max,minBy,maxBy) | 输入类型[,KEY] |
应用拓扑
除了改变一个或多个现有 算子的逻辑之外,还可以通过更改应用程序的拓扑结构来升级应用程序,即通过添加或删除 算子,更改 算子的并行性或修改 算子链接行为。
通过更改其拓扑来升级应用程序时,需要考虑一些事项以保持应用程序状态的一致性。
- 添加或删除无状态 算子:除非以下情况之一适用,否则这没有问题。
- 添加有状态 算子: 算子的状态将使用默认状态初始化,除非它接管另一个 算子的状态。
- 删除有状态 算子:除非另一个 算子将其删除,否则删除的 算子的状态将丢失。启动升级后的应用程序时,您必须明确同意丢弃该状态。
- 更改 算子的输入和输出类型:在具有内部状态的 算子之前或之后添加新 算子时,必须确保不修改有状态 算子的输入或输出类型以保存内部 算子状态的数据类型(详见上文)。
- 更改算子链接:算子可以链接在一起以提高性能。从1.3.x以后的保存点恢复时,可以在保持状态一致性的同时修改链。有可能打破链条,使有状态的 算子移出链。还可以将新的或现有的有状态 算子附加或注入链中,或修改链中的 算子顺序。但是,将保存点升级到1.3.x时,拓扑在链接方面没有变化是至关重要的。应为链中一部分的所有 算子分配一个ID,如上面的匹配 算子状态部分所述。
升级Flink Framework版本
本节介绍了跨版本升级Flink以及在版本之间迁移作业的一般方法。
简而言之,此过程包括两个基本步骤:
- 在以前的旧Flink版本中获取要迁移的作业的保存点。
- 从先前获取的保存点恢复新Flink版本下的作业。
除了这两个基本步骤之外,还可能需要一些额外的步骤,这些步骤取决于您希望更改Flink版本的方式。在本指南中,我们区分了两种跨Flink版本升级的方法:就地升级和卷 影副本升级。
对于就地更新,在获取保存点后,您需要:
- 停止/取消所有正在运行的作业
- 关闭运行旧Flink版本的群集。
- 将Flink升级到群集上的较新版本。
- 在新版本下重新启动群集。
对于卷影副本,您需要:
- 在从保存点恢复之前,除了旧的Flink安装之外,还要设置新Flink版本的新安装。
- 使用新的Flink安装从保存点恢复。
- 如果一切正常,请停止并关闭旧的Flink集群。
在下文中,我们将首先介绍成功迁移工作的前提条件,然后详细介绍我们之前概述的步骤。
前提条件
在开始迁移之前,请检查您尝试迁移的作业是否遵循保存点的最佳做法。另外,请查看 API迁移指南,了解是否存在与将保存点迁移到较新版本相关的任何API更改。
特别是,我们建议您检查是否uid
为您的工作中的算子设置了明确的s。
这是一个软前置条件,如果您忘记分配s ,恢复应该仍然有效uid
。如果遇到无效的情况,可以使用该调用手动将以前Flink版本生成的旧版顶点ID添加到作业中setUidHash(String hash)
。对于每个 算子(在 算子链中:只有头 算子),您必须分配32个字符的十六进制字符串,表示您可以在web ui中看到的哈希值或 算子的日志。
除了算子uid之外,目前有两个难以进行的作业迁移前提条件会导致迁移失败:
我们不支持使用
semi-asynchronous
模式检查点的RocksDB中的状态迁移 。如果您的旧作业使用此模式,您仍然可以fully-asynchronous
在使用用作迁移基础的保存点之前将作业更改为使用 模式。另一个重要的前提条件是,对于Flink 1.3.x之前的保存点,所有保存点数据必须可从新安装访问并驻留在相同的绝对路径下。在Flink 1.3.x之前,保存点数据通常不会仅在创建的保存点文件中自包含。可以从保存点文件内部引用其他文件(例如,状态后台SNAPSHOT的输出)。自Flink 1.3.x以来,这不再是一个限制; 可以使用典型的文件系统 算子操作重定位保存点。
第1步:使用旧Flink版本中的保存点。
作业迁移的第一个主要步骤是在较旧的Flink版本中运行您的作业的保存点。您可以使用以下命令执行此 算子操作:
$ bin/flink savepoint :jobId [:targetDirectory]
有关更多详细信息,请阅读保存点文档。
第2步:将群集更新为新的Flink版本。
在此步骤中,我们将更新群集的框架版本。这基本上意味着用新版本替换Flink安装的内容。此步骤可能取决于您在群集中运行Flink的方式(例如,独立,在Mesos上......)。
如果您不熟悉在群集中安装Flink,请阅读部署和群集设置文档。
步骤3:从保存点恢复新Flink版本下的作业。
作为作业迁移的最后一步,您将从上面在更新的群集上获取的保存点恢复。您可以使用以下命令执行此 算子操作:
$ bin/flink run -s :savepointPath [:runArgs]
再次,有关更多详细信息,请查看保存点文档。
兼容性表
保存点与Flink版本兼容,如下表所示:
Created with \ Resumed with | 1.1.x | 1.2.x | 1.3.x | 1.4.x | 1.5.x | 1.6.x | 限制 |
---|---|---|---|---|---|---|---|
1.1.x | Ø | Ø | Ø | 从Flink 1.1.x迁移到1.2.x +的作业的最大并行度目前已确定为作业的并行性。这意味着迁移后不能增加并行性。在将来的错误修复版本中可能会删除此限制。 | |||
1.2.x | Ø | Ø | Ø | Ø | Ø | 从Flink 1.2.x迁移到Flink 1.3.x +时,不支持同时更改并行性。用户必须在迁移到Flink 1.3.x +后首先获取保存点,然后更改并行度。为CEP应用程序创建的保存点无法在1.4.x +中恢复。 | |
1.3.x | Ø | Ø | Ø | Ø | 如果保存点包含Scala案例类,则从Flink 1.3.0迁移到Flink 1.4。[0,1]将失败。用户必须直接迁移到1.4.2+。 | ||
1.4.X | Ø | Ø | Ø | ||||
1.5.x | Ø | Ø | |||||
1.6.x版 | Ø |