0
点赞
收藏
分享

微信扫一扫

大数据Hadoop之——Flink的状态管理和容错机制(checkpoint)

爱情锦囊 2022-03-11 阅读 104

文章目录

一、Flink中的状态

官方文档

  • 数据流中的数据有重复,想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。

1)键控状态(Keyed State)

在这里插入图片描述

1、控件状态特点

  • 键控状态是根据输入数据流中定义的键(key)来维护和访问的
  • Flink 为每个 key 维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态
  • 当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key

2、键控状态类型

键控状态类型说明方法
ValueState[T]值状态,保存一个可以更新和检索的值ValueState.update(value: T)
ValueState.value()
ListState[T]列表状态,保存一个元素的列表可以往这个列表中追加数据,并在当前的列表上进行检索。ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.update(values: java.util.List[T])
ListState.get()(注意:返回的是Iterable[T]
ReducingState聚合状态,保存一个单值,表示添加到状态的所有值的聚合,接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。ReducingState.add(value: T)
ReducingState.get()
AggregatingState<IN, OUT>聚合状态,保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。AggregatingState.add(value: T)
AggregatingState.get()
MapState<UK, UV>映射状态,维护了一个映射列表,保存Key-Value对。MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K)

3、状态有效期 (TTL)

【官网示例】

package com
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

object StateTest001 {
  def main(args: Array[String]): Unit = {
    val ttlConfig = StateTtlConfig
      .newBuilder(Time.seconds(1))
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
      .build

    val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
    stateDescriptor.enableTimeToLive(ttlConfig)
    
  }
}

TTL 配置有以下几个选项:

  • newBuilder 的第一个参数表示数据的有效期,是【必选项】。
  • TTL 的更新策略(默认是 OnCreateAndWrite)
    1. StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
    2. StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
1)过期数据的清理
import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground
    .build
2)全量快照时进行清理
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build
3)增量数据清理
4)在 RocksDB 压缩时清理
import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build

【注意】

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

4、键控状态的使用

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

2)算子状态(Operatior State)

在这里插入图片描述

1、算子状态特点

  • 算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态
  • 状态对于同一子任务而言是共享
  • 算子状态不能由相同或不同算子的另一个子任务访问

2、算子状态类型

键控状态类型说明
列表状态(ListState)将状态表示为一组数据的列表
联合列表状态(UnionListState)也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复
广播状态(BroadcastState)如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

3)广播状态 (Broadcast State)

  • 它具有 map 格式,
  • 它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流,
  • 这类算子可以拥有不同命名的多个广播状态 。

二、状态后端(State Backends)

1)三种状态存储方式

存储方式说明
MemoryStateBackend【默认模式】状将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,将checkpoint存储在JobManager的内存中。主要适用于本地开发和调试
FsStateBackend基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。
RocksDBStateBackend将所有状态序列化后,存入本地的RocksDB中存储。

对于HeapKeyedStateBackend,有两种实现:

  • 支持异步 Checkpoint(默认):存储格式 CopyOnWriteStateMap
  • 仅支持同步 Checkpoint:存储格式 NestedStateMap

2)配置方式

1、【第一种方式】基于代码方式进行配置

// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/flink/checkpoints"));

配置 RocksDBStateBackend 时,需要额外导入下面的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
</dependency>

2、【第二种方式】基于 flink-conf.yaml 配置文件的方式进行配置

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints

三、容错机制(checkpoint)

1)一致性

  • at-most-once:至多一次。故障发生之后,计算结果可能丢失,就是无法保证结果的正确性;
  • at-least-once:至少一次。计算结果可能大于正确值,但绝不会小于正确值,就是计算程序发生故障后可能多算,但是绝不可能少算;
  • exactly-once:精确一次。系统保证发生故障后得到的计算结果的值和正确值一致;

2)检查点(checkpoint)

1、开启与配置 Checkpoint

2、Checkpoint 属性

属性说明
精确一次(exactly-once)你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择保证等级级别。
checkpoint 超时如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃
checkpoints 之间的最小时间该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
checkpoint 可容忍连续失败次数该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。
并发 checkpoint 的数目默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。
externalized checkpoints你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 Externalized checkpoints 的部署文档。

【官网示例】

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)

// 高级选项:

// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)

// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)

// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().enableExternalizedCheckpoints(
  ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()

3)从检查点恢复状态

  • 【第一步】遇到故障之后,第一步就是重启应用
  • 【第二步】是从 checkpoint 中读取状态,将状态重置,从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
  • 【第三步】开始消费并处理检查点到发生故障之间的所有数据,这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly- once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

4)检查点的实现算法

  • 【一种简单的想法】:暂停应用,保存状态到检查点,再重新恢复应用
  • 【Flink 的改进实现】:
    1. 基于 Chandy-Lamport 算法的分布式快照
    2. 将检查点的保存和数据处理分离开,不暂停整个应用

5)检查点算法

1、检查点分界线(Checkpoint Barrier)

在这里插入图片描述

  • 将barrier插入到数据流中,作为数据流的一部分和数据一起向下流动。Barrier不会干扰正常数据,数据流严格有序。

  • 一个barrier把数据流分割成两部分:一部分进入到当前快照,另一部分进入到下一个快照。

  • 每一个barrier都带有快照ID,并且barrier之前的数据都进入了此快照。Barrier不会干扰数据流处理,所以非常轻量。

  • 多个不同快照的多个barrier会在流中同时出现,即多个快照可能同时创建。

2、Barrier对齐

在这里插入图片描述

3、执行一次检查点步骤

  1. jobManager会向每个source任务发送一条带有新检查点ID的消息,通过这种方式来启动检查点。
  2. 数据源将他们各自的状态写入检查点后,并向下游所有分区发出一个检查点barrier。状态后端在状态存入检查点之后,会返回通知给source任务,source任务再向jobmanager确认检查点完成。
  3. barrier向下游传递,下游任务会等待所有输入分区的barrier的到达后再做状态保存通知jobmanager状态保存完成,并再向下游所有分区发送收到的检查点barrier。
  1. sink任务向jobmanager确认状态保存到checkpoint完成。即所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了。

6)保存点(savepoint)

1、概述

  • Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
  • 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点;
  • Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作;
  • 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等。

2、savepoint触发的三种方式

  1. 使用 flink savepoint 命令触发 Savepoint,其是在程序运行期间触发 savepoint。

  2. 使用 flink cancel -s 命令,取消作业时,并触发 Savepoint。

  3. 使用 Rest API 触发 Savepoint,格式为:/jobs/:jobid /savepoints

7)检查点(checkpoint)与 保存点(savepoint)的区别与联系

  • checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
  • savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。
  • checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预savepoint面向用户完全根据用户的需要触发与清理
  • checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。

未完待续,请耐心等待~

举报

相关推荐

0 条评论