文章目录
- Flink 系列文章
 - 三、示例:程序中设置重启策略
 
- 1、演示代码
 - 2、maven依赖
 - 3、实现(Flink 1.13.6版本)
 
- 1)、序列化
 - 2)、实现
 - 3)、验证
 
- 4、实现(Flink 1.17.0版本)
 
- 1)、序列化
 - 2)、实现
 - 3)、验证
 
- 四、示例:手动重启-检验checkpoint
 
- 1、maven打包
 - 2、上传打包后的jar
 - 3、验证程序功能
 - 4、手工恢复
 - 5、验证
 
- 五、示例:通过savepoint手动恢复
 
 
本文介绍了Flink 在程序中设置重启策略、手动重启查看checkpoint的state恢复以及通过savepoint手动恢复,其中包含详细的验证步骤与验证结果。
本文除了maven依赖外,没有其他依赖。
本文依赖hadoop环境、kafka环境、flink集群环境好用。
三、示例:程序中设置重启策略
本示例是将数据sink到kafka中,因flink在kafka的实现过程中出现不同的版本,故本示例给出了2个不同的版本实现。
1、演示代码
该代码包含四种重启策略,根据自己的情况进行验证即可。
 本示例着重验证了固定次数重启策略。
2、maven依赖
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>3.1.4</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>3.1.4</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>3.1.4</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-sql-connector-kafka_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-jdbc_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-csv</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-json</artifactId>
	<version>${flink.version}</version>
</dependency>3、实现(Flink 1.13.6版本)
1)、序列化
package org.datastreamapi.checkpoint.serialization;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * @author alanchan
 *
 */
/**
 * kafka sink tuple的序列化实现
 * 
 * @author alanchan
 *
 */
public class AlanKafkaSerializationSchema_Tuple implements KafkaSerializationSchema<Tuple2<String, Integer>> {
	String topic;
	public AlanKafkaSerializationSchema_Tuple(String topic) {
		this.topic = topic;
	}
	@Override
	public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> element, Long timestamp) {
		return new ProducerRecord(topic, (element.f0 + ":" + element.f1).getBytes());
	}
}2)、实现
package org.datastreamapi.checkpoint;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.datastreamapi.checkpoint.serialization.AlanKafkaSerializationSchema_Tuple;
/**
 * @author alanchan
 *
 */
public class TestCheckpointRestartStrategyDemo {
	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "alanchan");
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		// checkpoint
		env.enableCheckpointing(1000);
		env.setStateBackend(new FsStateBackend("hdfs://server1:8020//flinktest/flinkckp"));
		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		// 配置重启策略:
		// 1、配置了Checkpoint的情况下,默认是Integer.MAX_VALUE次重启并自动恢复
		// 2、单独配置无重启策略RestartStrategies.noRestart()
		// 3、固定延迟重启RestartStrategies.fixedDelayRestart
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // 最多重启3次数
				Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
		));
		// 4、失败率重启策略RestartStrategies.failureRateRestart
		// 如果2分钟内job失败不超过3三次,,自动重启,, 每次间隔10s (如果2分钟内程序失败超过(含)3次,则程序退出)
//				env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 每个测量时间间隔最大失败次数
//						Time.of(2, TimeUnit.MINUTES), // 失败率测量的时间间隔
//						Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
//				));
		// Source
		DataStream<String> linesDS = env.socketTextStream("192.168.10.42", 9999);
		// Transformation
		DataStream<Tuple2<String, Integer>> wordTuple = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				String[] words = value.split(",");
				for (String word : words) {// vx:alanchanchn
					if (word.equals("vx:alanchanchn")) {
						System.out.println("出现了敏感词。。。。。。。。。。不能出现微信号:alanchanchn。");
						throw new Exception("出现了敏感词。。。。。。。。。。。不能出现微信号:alanchanchn。");
					}
					out.collect(Tuple2.of(word, 1));
				}
			}
		});
		DataStream<Tuple2<String, Integer>> sumResult = wordTuple.keyBy(t -> t.f0).sum(1);
		// sink
		sumResult.print();
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "server1:9092");
		props.setProperty("transaction.timeout.ms", "3000");
		String topic = "t_kafkasink";
//				FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
		FlinkKafkaProducer<Tuple2<String, Integer>> kafkaSink = new FlinkKafkaProducer<>(topic, new AlanKafkaSerializationSchema_Tuple(topic), props,
				FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
		sumResult.addSink(kafkaSink);
		// 5.execute
		env.execute();
	}
}3)、验证
验证实际上分为3部分,即应用程序控制台、kafka输出和hdfs上的checkpoint。
由于本示例仅仅是为了演示重启策略,故其他的两个部分不再赘述。
5> (alanchan,1)
5> (alanchanchn,1)
5> (alanchan,2)
13> (alan,1)
5> (chan,1)
11> (chn,1)
出现了敏感词。。。。。。。。。。不能出现微信号。
11> (chn,2)
5> (alanchan,3)
5> (alanchanchn,2)
11> (chn,2)
10> (vx:alanchanchn,1)
5> (alanchan,3)
5> (alanchanchn,2)
出现了敏感词。。。。。。。。。。不能出现微信号。
11> (chn,3)
5> (alanchan,4)
5> (alanchanchn,3)
出现了敏感词。。。。。。。。。。不能出现微信号。
出现了敏感词。。。。。。。。。。不能出现微信号。
5> (alanchan,4)
11> (chn,3)
5> (alanchanchn,3)
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
# 应用程序出现了异常并退出4、实现(Flink 1.17.0版本)
1)、序列化
package org.datastreamapi.checkpoint.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
/**
 * @author alanchan
 *
 */
public class KafkaValueSerializationSchema_Tuple implements SerializationSchema<Tuple2<String, Integer>> {
	@Override
	public byte[] serialize(Tuple2<String, Integer> element) {
		return (element.f0 + ":" + element.f1).getBytes();
	}
}2)、实现
package org.datastreamapi.checkpoint;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.datastreamapi.checkpoint.serialization.AlanKafkaSerializationSchema_Tuple;
import org.datastreamapi.checkpoint.serialization.KafkaValueSerializationSchema_Tuple;
/**
 * @author alanchan
 *
 */
public class TestCheckpointRestartStrategyDemo2 {
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "alanchan");
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		// checkpoint
		env.enableCheckpointing(1000);
		env.setStateBackend(new FsStateBackend("hdfs://server1:8020//flinktest/flinkckp"));
		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // 最多重启3次数
				Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
		));
		// Source
		DataStream<String> linesDS = env.socketTextStream("192.168.10.42", 9999);
		// Transformation
		DataStream<Tuple2<String, Integer>> wordTuple = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				String[] words = value.split(",");
				for (String word : words) {// vx:alanchanchn
					if (word.equals("vx:alanchanchn")) {
						System.out.println("出现了敏感词。。。。。。。。。。不能出现微信号:alanchanchn。");
						throw new Exception("出现了敏感词。。。。。。。。。。。不能出现微信号:alanchanchn。");
					}
					out.collect(Tuple2.of(word, 1));
				}
			}
		});
		DataStream<Tuple2<String, Integer>> sumResult = wordTuple.keyBy(t -> t.f0).sum(1);
		// sink
		sumResult.print();
		String topic = "t_kafkasink";
		KafkaSink<Tuple2<String, Integer>> kafkaSink = KafkaSink.<Tuple2<String, Integer>>builder()
				.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
				.setRecordSerializer(KafkaRecordSerializationSchema.builder()
						.setTopic(topic)
						.setValueSerializationSchema(new KafkaValueSerializationSchema_Tuple())
						.build()
					)
				.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
				.build();
		
		sumResult.sinkTo(kafkaSink);
		
		// 5.execute
		env.execute();
	}
}3)、验证
略。参考Flink 1.13.6版本验证内容。
四、示例:手动重启-检验checkpoint
使用【三、示例:程序中设置重启策略】的例子,将该应用程序打包并上传至flink集群。
关于maven打包以及Flink集群提交任务参考该专栏的文章。1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
1、maven打包
mvn package  -Dmaven.test.skip=true2、上传打包后的jar
上传地址:http://server1:8081/#
上传成功后的界面,并设置运行主类,即main函数所在的类

 上传成功后,任务处于运行状态


3、验证程序功能
验证方式与上面在开发工具中验证一致,即在nc中输入数据,观察kafka中的输出。
验证关键点:是否自动重启了
- 输入数据
 
[root@server2 ~]# nc -lk 9999
aa,
bb,aa
cc,bb,aa,a
dd
aa
cc- kafka控制台输出
 
aa:1
bb:1
aa:2
dd:1
aa:3
cc:1
aa:4
bb:24、手工恢复
在恢复点填入checkpoint对应的文件进行恢复。
本示例的地址为:hdfs://server2:8020/flinktest/flinkckp/0f93e35e25c3fb87ee8ce3d6393d6344/chk-129

 填写完毕后提交任务,成功后进入如下页面

5、验证
再次验证,即关键之前计算的结果是否存在以及输入相同的键值,是否在原来的基础上累加。
- nc输入
 
[root@server2 ~]# nc -lk 9999
dd
bb
aa
a- kafka控制台输出
 
dd:2
aa:4
bb:3
cc:2以上完成了checkpoint的手工启动验证,实际生产中可能是系统自动完成的,不需要人工启动。如因非程序原因需要自动启动的话,比如系统重启等外界因素,一般使用手工的启动,人为的设置savepoint。
下面一节将介绍savepoint部分。
五、示例:通过savepoint手动恢复
在实际生产中,如要对集群进行停机维护/扩容…那么这时候需要执行一次Savepoint也就是执行一次手动的Checkpoint(也就是手动的发一个barrier栅栏),程序的所有状态都会被执行快照并保存,当维护/扩容完毕之后,可以从上一次Savepoint的目录中进行恢复。
本示例以flink提交任务的session模式进行演示
# 启动yarn session
/usr/local/flink-1.13.5/bin/yarn-session.sh -n 2 -tm 1024 -s 1 -d
# 运行job-会自动执行Checkpoint
/usr/local/flink-1.13.5/bin/flink run --class org.checkpoint.CheckpointRestartStrategyDemo /usr/local/bigdata/testdata/original-window_state_checkpoint_watermaker-0.0.1-SNAPSHOT.jar
# 手动创建savepoint--相当于手动做了一次Checkpoint
# 225125bc4ddf3f69190ebcb8e82e428f是当前任务的id
/usr/local/flink-1.13.5/bin/flink savepoint 225125bc4ddf3f69190ebcb8e82e428f hdfs://server1:8020//flinktest/flinkckp
# 停止job
/usr/local/flink-1.13.5/bin/flink cancel 225125bc4ddf3f69190ebcb8e82e428f
# 重新启动job,手动加载savepoint数据
# savepoint-702b87-0a11b997fa70 是创建savepoint时系统自动生成的checkpoint文件名称
/usr/local/flink-1.13.5/bin/flink run -s hdfs://server1:8020/flinktest/savepoint/savepoint-702b87-0a11b997fa70 --class org.checkpoint.CheckpointRestartStrategyDemo /usr/local/bigdata/testdata/original-window_state_checkpoint_watermaker-0.0.1-SNAPSHOT.jar
# 停止yarn session
# 关闭方式很多,比如kill或界面上中止等以上,本文介绍了Flink 在程序中设置重启策略、手动重启查看checkpoint的state恢复以及通过savepoint手动恢复,其中包含详细的验证步骤与验证结果。










