状态后台

译者:flink.sojb.cn

Data Stream API编写的程序通常以各种形式保存状态:

  • Windows会在触发数据元或聚合之前收集数据元或聚合
  • 转换函数可以使用键/值状态接口来存储值
  • 转换函数可以实现CheckpointedFunction接口以使其局部变量具有容错能力

另请参阅流API指南中的状态部分

激活检查点时,检查点会持续保持此类状态,以防止数据丢失并始终如一地恢复。状态如何在内部表示,以及在检查点上如何以及在何处持续取决于所选择的状态后台

可用的状态后台

开箱即用,Flink捆绑了这些状态后台:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

如果没有配置其他任何内容,系统将使用MemoryStateBackend。

MemoryStateBackend

MemoryStateBackend保存数据在内部作为Java堆的对象。键/值状态和窗口 算子包含存储值,触发器等的哈希表。

在检查点时,此状态后台将对状态进行SNAPSHOT,并将其作为检查点确认消息的一部分发送到JobManager(主服务器),JobManager也将其存储在其堆上。

可以将MemoryStateBackend配置为使用异步SNAPSHOT。虽然我们强烈建议使用异步SNAPSHOT来避免阻塞管道,但请注意,默认情况下,此函数目前处于启用状态。要禁用此函数,用户可以MemoryStateBackend在构造函数中将相应的布尔标志实例化为false(这应该仅用于调试),例如:

 new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

MemoryStateBackend的局限性:

  • 默认情况下,每个状态的大小限制为5 MB。可以在MemoryStateBackend的构造函数中增加此值。
  • 无论配置的最大状态大小如何,状态都不能大于akka帧大小(请参阅配置)。
  • 聚合状态必须适合JobManager内存。

鼓励MemoryStateBackend用于:

  • 本地开发和调试
  • 几乎没有状态的作业,例如仅包含一次记录函数的作业(Map,FlatMap,Filter,...)。Kafka消费者需要很少的状态。

FsStateBackend

所述FsStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。

FsStateBackend将正在运行的数据保存在TaskManager的内存中。在检查点时,它将状态SNAPSHOT写入配置的文件系统和目录中的文件。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。

FsStateBackend 默认使用异步SNAPSHOT,以避免在编写状态检查点时阻塞处理管道。要禁用此函数,用户可以FsStateBackend在构造函数集中使用相应的布尔标志来实例化a false,例如:

 new FsStateBackend(path, false);

鼓励FsStateBackend:

  • 具有大状态,长窗口,大键/值状态的作业。
  • 所有高可用性设置。

RocksDBStateBackend

所述RocksDBStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。

RocksDBStateBackend将RocksDB数据库中的飞行中数据保存在(默认情况下)存储在TaskManager数据目录中。在检查点时,整个RocksDB数据库将被检查点到配置的文件系统和目录中。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。

RocksDBStateBackend始终执行异步SNAPSHOT。

RocksDBStateBackend的局限性:

  • 由于RocksDB的JNI桥接API基于byte [],因此每个Keys和每个值的最大支持大小为2 ^ 31个字节。重要提示:在RocksDB中使用合并 算子操作的状态(例如ListState)可以静默地累积> 2 ^ 31字节的值大小,然后在下次检索时失败。这是目前RocksDB JNI的一个限制。

我们鼓励RocksDBStateBackend:

  • 具有非常大的状态,长窗口,大键/值状态的作业。
  • 所有高可用性设置。

请注意,您可以保存的状态量仅受可用磁盘空间量的限制。与将状态保持在内存中的FsStateBackend相比,这允许保持非常大的状态。但是,这也意味着使用此状态后台可以实现的最大吞吐量更低。对此后台的所有读/写都必须通过去/序列化来检索/存储状态对象,这比使用堆基表示正在进行的堆上表示更昂贵。

RocksDBStateBackend是目前唯一提供增量检查点的后台(见这里)。

配置状态后台

如果您不指定任何内容,则默认状态后台是JobManager。如果要为群集上的所有作业建立不同的默认值,可以通过在flink-conf.yaml中定义新的默认状态后台来实现。可以基于每个作业覆盖默认状态后台,如下所示。

设置每个作业状态后台

每个作业状态后台StreamExecutionEnvironment在作业上设置,如下例所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))

设置默认状态后台

可以flink-conf.yaml使用配置Keys在配置中配置默认状态后台state.backend

config条目的可能值包括jobmanager(MemoryStateBackend),filesystem(FsStateBackend),rocksdb(RocksDBStateBackend),或实现状态后台工厂FsStateBackendFactory的类的完全限定类名,例如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactoryRocksDBStateBackend。

state.checkpoints.dir选项定义所有后台写入检查点数据和元数据文件的目录。您可以在此处找到有关检查点目录结构的更多详细信息。

配置文件中的示例部分可能如下所示:

# The backend that will be used to store operator state checkpoints

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


state.backend: filesystem

# Directory for storing checkpoints

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

results matching ""

    No results matching ""