0
点赞
收藏
分享

微信扫一扫

Flink中State管理与恢复之状态后端Backend案例

设置 HDFS 文件系统的状态后端,取消 Job 之后再次恢复 Job。

package state

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* @Author yqq
* @Date 2021/12/26 23:55
* @Version 1.0
*/
object TestCheckPointByHDFS {
//使用WordCount案例来测试一下HDFS的状态后端,先运行一段时间Job,然后cansol,在重新启动,看看状态是否是连续的
def main(args: Array[String]): Unit = {
//1.初始化Flink流计算的环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//开启CheckPoint并设置一些参数
environment.enableCheckpointing(5000)//每个5秒开启一次CheckPoint
environment.setStateBackend(new FsStateBackend("hdfs://mycluster/checkpoint/cp1"))//存放检查点数据
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
environment.getCheckpointConfig.setCheckpointTimeout(5000)
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//终止job保留检查点数据

//修改并行度
environment.setParallelism(1)
//2.导入隐式转换
import org.apache.flink.streaming.api.scala._
//3.读取数据,读取sock流中的数据,DataStream => 相当于spark中的DStream
val stream: DataStream[String] = environment.socketTextStream("node1", 8888)
//4.转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0) //分组算子,0 或者 1 代表下标,前面的DataStream[二元组],0=>代表单词 1=>代表出现的次数
.sum(1) //聚合累加算子
//5.打印结果
result.print("结果")
//6.启动流计算程序
environment.execute("wordCount")
}
}

打包在服务器上执行
Flink中State管理与恢复之状态后端Backend案例_flink
查看执行结果,可以看出 flink 出现三
Flink中State管理与恢复之状态后端Backend案例_hdfs_02

Flink中State管理与恢复之状态后端Backend案例_big data_03
取消 Job,可以看到 Job 已经停止
Flink中State管理与恢复之状态后端Backend案例_后端_04
查看 HDFS 目录上的状态文件
Flink中State管理与恢复之状态后端Backend案例_hdfs_05
重启任务,再次输入两个flink单词
Flink中State管理与恢复之状态后端Backend案例_后端_06
查看结果,flink单词统计为5
Flink中State管理与恢复之状态后端Backend案例_后端_07


举报

相关推荐

0 条评论