0
点赞
收藏
分享

微信扫一扫

Flink中State管理与恢复之Savepint案例

Savepoints 是检查点的一种特殊实现,底层实现其实也是使用 Checkpoints 的机制。 Savepoints 是用户以手工命令的方式触发 Checkpoint,并将结果持久化到指定的存储路径 中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机 运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从 而无法实现从端到端的 Excatly-Once 语义保证。

1.配置 Savepoints 的存储路径
在 flink-conf.yaml 中配置 SavePoint 存储的位置,设置后,如果要创建指定 Job 的 SavePoint,可以不用在手动执行命令时指定 SavePoint 的位置。

state.savepoints.dir: hdfs://mycluster/savepoint/

2.在代码中设置算子 ID
为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员 通过手动给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子 指定 ID,则会由 Flink 自动给每个算子生成一个 ID。而这些自动生成的 ID 依赖于程序的结 构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置 ID。

package state

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* @Author yqq
* @Date 2021/12/27 14:49
* @Version 1.0
*/
object TestSavePoints {
def main(args: Array[String]): Unit = {
//1.初始化Flink流计算的环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
// environment.setParallelism(1)
//2.导入隐式转换
import org.apache.flink.streaming.api.scala._
//3.读取数据,读取sock流中的数据,DataStream => 相当于spark中的DStream
val stream: DataStream[String] = environment.socketTextStream("node1", 8888)
.uid("sock001")
//4.转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.uid("flatmap001")
.map((_, 1))
.uid("map001")
.keyBy(0) //分组算子,0 或者 1 代表下标,前面的DataStream[二元组],0=>代表单词 1=>代表出现的次数
.sum(1) //聚合累加算子
.uid("sum001")
//5.打印结果
result.print("结果")
//6.启动流计算程序
environment.execute("wordCount")
}
}

3.触发 SavePoint
先启动Job

[root@node1 bin]# ./flink run -c state.TestSavePoints /root/test/Flink-test-1.0-SNAPSHOT.jar 
Starting execution of program

输入单词

[root@node1 ~]# nc -lk 8888
hello flink flink

Flink中State管理与恢复之Savepint案例_scala
再取消Job ,触发SavePoint

[root@node1 bin]# ./flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
27.12.2021 16:29:12 : 8b960baee4269a804938e31792f984d3 : wordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@node1 bin]# ./flink savepoint 8b960baee4269a804938e31792f984d3
Triggering savepoint for job 8b960baee4269a804938e31792f984d3.
Waiting for response...
Savepoint completed. Path: hdfs://mycluster/savepoint/savepoint-8b960b-fc5cc036f1f2
You can resume your program from this savepoint with the run command.
[root@node1 bin]# ./flink cancel 8b960baee4269a804938e31792f984d3
Cancelling job 8b960baee4269a804938e31792f984d3.
Cancelled job 8b960baee4269a804938e31792f984d3.

从 SavePoint 启动 Job、也可以通过 Web UI 启动 Job

[root@node1 bin]# ./flink run -s hdfs://mycluster/savepoint/savepoint-8b960b-fc5cc036f1f2 -c state.TestSavePoints /root/test/Flink-test-1.0-SNAPSHOT.jar 
Starting execution of program

再次输入一个单词flink

[root@node1 ~]# nc -lk 8888
hello flink flink
flink

查看结果,理论有三个flink单词
Flink中State管理与恢复之Savepint案例_big data_02


举报

相关推荐

0 条评论