0
点赞
收藏
分享

微信扫一扫

【C 数据结构】双向链表

半夜放水 04-15 16:30 阅读 2
kafkaflink

记录kafka-flink-kafka的end-to-end的exactly-once语义

步骤

  1. 开启checkpoint、stateBackend的设置和checkpoint配置
  2. 设置kafka source的配置
  3. 读取kafka source message
  4. 随意的transformation;并打印结果
  5. kafka sink端的配置
  6. 输出到kafka sink端
  7. 执行

代码

package com.javaye.demo.exactly;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @Author: Java页大数据
 * @Date: 2024-04-11:17:59
 * @Describe:
 *  kafka - flink - kafka 验证end-to-end的exactly once
 */
public class ExactlyOnce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        1.1. 开启checkpoint,间隔为1000L ms
        env.enableCheckpointing(1000L);

//        1.2. stateBackend:checkpoint持久化目录
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://only:9870/flink-checkpoints"));
        }

        CheckpointConfig config = env.getCheckpointConfig();
//        1.3. ckp的配置
//        1.3.1. 前后两次checkpoint的最小间隔:防止前后两次的checkpoint重叠
        config.setMinPauseBetweenCheckpoints(500L);
//        1.3.2. 容忍5次checkpoint失败
        config.setTolerableCheckpointFailureNumber(5);
//        1.3.3. job被取消时,保留外部的checkpoint
        config.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        1.3.4. 设置checkpoint的语义为 exactly-once
        config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        1.3.5. 设置checkpoint的超时时间,若checkpoint超过该超时时间则说明该次checkpoint失败,丢弃该checkpoint
        config.setCheckpointTimeout(60 * 1000);
//        1.3.6. 设置同一时刻允许多少个checkpoint同时执行
        config.setMaxConcurrentCheckpoints(1);

//        1.4. 设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));


//        2. 设置kafka source的配置
        String kafkaServer = "only:9092";
        String sourceTopic = "flink_kafka_source";
        String groupId = "flink_kafka_source_exactly_once";
        String clientIdPrefix = "flink_exactly_once";
        Properties kafkaSourceProp = new Properties();
        KafkaSource<String> kafkaSource = KafkaSource
                .<String>builder()
                .setBootstrapServers(kafkaServer)
                .setTopics(sourceTopic)
                .setGroupId(groupId)
                .setClientIdPrefix(clientIdPrefix)
                .setStartingOffsets(OffsetsInitializer.latest()) // Start from latest offset
                .setProperty("partition.discovery.interval.ms", "50000") // discover new partitions per 50 seconds
                .setProperty("auto.offset.reset", "latest")
                .setValueOnlyDeserializer(new SimpleStringSchema())
//                执行checkpoint时提交offset到checkpoint,flink内部使用,并且提交一份到默认主题__consumer_offsets
//                .setCommitOffsetsOnCheckpoints(true) // checkpoint开启默认为true,否则为false;不支持该方法
                .setProperties(kafkaSourceProp)
                .build();

//        3. 读取kafka source message
        DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "flink_kafka_exactly_once", TypeInformation.of(String.class));

//        4. 随意的transformation
        SingleOutputStreamOperator<String> flatMapDS = kafkaDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(",");
                for (String word : words) {
                    Random random = new Random();
                    int i = random.nextInt(5);
                    if (i > 3) {
                        System.out.println("模拟出现bug...");
                        throw new RuntimeException("模拟出现bug...");
                    }
                    System.out.println(word + "===" + i);
                    out.collect(word + "===" + i);
                }
            }
        });

//        4.1. 打印结果容易观察
        flatMapDS.print();

//        5. kafka sink端的配置
        Properties kafkaSinkProp = new Properties();
        kafkaSinkProp.setProperty("transaction.timeout.ms", 1000 * 5 + ""); //设置事务超时时间,也可在kafka配置中设置
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers(kafkaServer)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("flink_kafka_sink")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setKafkaProducerConfig(kafkaSinkProp)
                .build();

//        6. 输出到kafka sink端
        flatMapDS.sinkTo(kafkaSink);

//        7. 执行
        env.execute(ExactlyOnce.class.getName());
    }
}

举报

相关推荐

0 条评论