0
点赞
收藏
分享

微信扫一扫

中文编程工具开发语言开发的实际案例:触摸屏点餐软件应用场景实例

一点读书 2023-10-21 阅读 23

Watermark水印、水位线

水位线

概述

水印本质

生成Watermark

dataStream.assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);

Watermark策略

WatermarkStrategy接口如下:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
 
    /**
     * 根据策略实例化一个 watermark 生成器
     * 主要负责按照既定的方式,基于时间戳生成水位线
     */
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context var1);
	
    /**
     * 负责从流中数据元素的某个字段中提取时间戳,并分配给元素
     * 时间戳的分配是生成水位线的基础
     */   
    default TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return new RecordTimestampAssigner();
    }
}

WatermarkStrategy工具类

        /**
         * 为时间戳单调递增的情况创建水印策略,适用于有序流
         */
        static <T > WatermarkStrategy < T > forMonotonousTimestamps() {
            return (ctx) -> new AscendingTimestampsWatermarks<>();
        }

        /**
         * 为记录无序流的情况创建水印策略,但可以设置事件无序程度的上限。
         */
        static <T > WatermarkStrategy < T > forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
            return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
        }

        /**
         * 基于watermarkgeneratorsupper自定义创建水印策略 
         */
        static <T > WatermarkStrategy < T > forGenerator(WatermarkGeneratorSupplier < T > generatorSupplier) {
            return generatorSupplier::createWatermarkGenerator;
        }

        /**
         * 创建完全不生成水印的水印策略。这在进行纯基于处理时间的流处理的场景中可能是有用
         */
        static <T > WatermarkStrategy < T > noWatermarks() {
            return (ctx) -> new NoWatermarksGenerator<>();
        }

使用forBoundedOutOfOrderness watermark生成器和一个lambda表达式作为时间戳分配器

        DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env
                .fromElements(
                        Tuple2.of("a", 1),
                        Tuple2.of("a", 2),
                        Tuple2.of("b", 3),
                        Tuple2.of("c", 4)
                );


        SingleOutputStreamOperator<Tuple2<String, Integer>> assignTimestampsAndWatermarks = dataStreamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple2<String, Integer>>forMonotonousTimestamps()
                        .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
        );

注意:

使用Watermark策略

WatermarkStrategy在哪里使用?

1.直接在数据源上使用


2.直接在非数据源的操作之后使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...)

注意:

内置Watermark生成器

Flink内置了两个WaterMark生成器

1.forMonotonousTimestamps: 时间戳单调增长:其实就是允许的延迟为0

WatermarkStrategy.forMonotonousTimestamps();

2.forBoundedOutOfOrderness: 允许固定时间的延迟

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

单调递增时间戳分配器

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // 从socket接收数据流
        SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);

        // 将输入数据转换为Integer
        DataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));


        // 定义Watermark策略
        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy
                // 升序的watermark,没有等待时间,即当 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行
                .<Integer>forMonotonousTimestamps()
                // TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数
                // 指定时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<Integer>() {
                    @Override
                    public long extractTimestamp(Integer element, long recordTimestamp) {
                        // 将输入数字转时间戳,单位毫秒,当作数据的时间戳
                        System.out.println("数据 " + element);
                        return element * 1000L;
                    }
                });

        //  指定watermark策略
        SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);


        singleOutputStreamOperator
                // 事件时间语义窗口
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessAllWindowFunction<Integer, String, TimeWindow>() {
                            @Override
                            public void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss");

                                long count = input.spliterator().estimateSize();

                                out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());
                            }
                        }
                )
                .print();
        env.execute();
    }
> nc -lk 8888
1
2
5
8
9
10
15
18
20
21
数据 1
数据 2 
数据 5 
数据 8 
数据 9 
数据 10 
窗口在时间区间: 1970-01-01 08:00:00-1970-01-01 08:00:10 产生5条数据,具体数据:[1, 2, 5, 8, 9]
数据 15 
数据 18 
数据 20 
窗口在时间区间: 1970-01-01 08:00:10-1970-01-01 08:00:20 产生3条数据,具体数据:[10, 15, 18]
数据 21 

固定延迟的时间戳分配器

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket接收数据流
        SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);

        // 将输入数据转换为Integer
        DataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));

        // 定义Watermark策略
        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy
                // 最大容忍的延迟时间: 定watermark生成 乱序 等待3s 即当输入 (数字转时间戳 - 3) 达到 滚动处理时间窗口10s 就触发窗口执行
                .<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 指定时间戳分配器 从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                             // 将输入数字转时间戳,单位毫秒,当作数据的时间戳
                            System.out.println("数据 " + element);
                            return element * 1000L;
                        });

        //  指定 watermark策略
        SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);


        singleOutputStreamOperator
                // 使用事件时间语义窗口
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessAllWindowFunction<Integer, String, TimeWindow>() {

                            @Override
                            public void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = input.spliterator().estimateSize();
                                out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
 nc -lk 8888
1
5
8
6
7
11
4  
13
15
20
19
23
26
数据 1
数据 5
数据 8
数据 6
数据 7
数据 11
数据 4
数据 13
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生6条数据,具体数据:[1, 5, 8, 6, 7, 4]
数据 15
数据 20
数据 19
数据 23
窗口在时间区间: 1970-01-01 08:00:10.000-1970-01-01 08:00:20.000 产生4条数据,具体数据:[11, 13, 15, 19]
数据 26

自定义WatermarkGenerator

watermark 的生成方式本质上是有两种:

1.周期性生成

2.标记生成

都需要继承接口WatermarkGenerator,接口如下:

/**
 * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
 *
 * <p><b>注意:</b>  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的调用,也许会生成新的 watermark,也许不会
     *
     * <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定
     */
    void onPeriodicEmit(WatermarkOutput output);
}

周期性Watermark生成器

示例1:

 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket接收数据流
        SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);

        // 将输入数据转换为Integer
        DataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));

        // 默认周期 200ms 修改默认周期时间为1000ms
        env.getConfig().setAutoWatermarkInterval(1000);


        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy
                // 自定义 周期性生成器 	3000L:延迟时间
                .<Integer>forGenerator(ctx -> new MyWatermarkGenerator<>(3000L))
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                             // 将输入数字转时间戳,单位毫秒,当作数据的时间戳
                            System.out.println("数据 " + element);
                            return element * 1000L;
                        });


        SingleOutputStreamOperator<Integer> singleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);


        singleOutputStreamOperator
                // 使用事件时间语义窗口
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessAllWindowFunction<Integer, String, TimeWindow>() {

                            @Override
                            public void process(Context context, Iterable<Integer> input, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = input.spliterator().estimateSize();
                                out.collect("窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }

    /**
     * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
     * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
     */
    public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {

        /**
         * 乱序等待时间
         * 允许的最大延迟时间 ms
         */
        private long maxOutOfOrderness;
        /**
         * 保存 当前为止 最大的事件时间
         */
        private long currentMaxTimestamp;

        public MyWatermarkGenerator(long maxOutOfOrderness) {
            this.maxOutOfOrderness = maxOutOfOrderness;
        }

        /**
         * 每条数据来,都会调用一次: 用来生产WaterMark中的时间戳
         * 为每个事件调用,允许水印生成器检查和记住事件时间戳,或根据事件本身发出水印。
         *
         * @param event
         * @param eventTimestamp 提取到数据的事件时间
         * @param output
         */
        @Override
        public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
            System.out.println("调用onEvent 目前为止最大时间戳 " + currentMaxTimestamp);
        }

        /**
         * 周期性调用: 发送watermark 默认200ms调用一次
         * <p>
         * 调用此方法和生成水印的时间间隔取决于ExecutionConfig.getAutoWatermarkInterval()
         *
         * @param output
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
            System.out.println("调用onPeriodicEmit 生成watermark " + (currentMaxTimestamp - maxOutOfOrderness - 1));
        }
    }
    }
调用onPeriodicEmit 生成watermark -3001
调用onPeriodicEmit 生成watermark -3001
数据 5
调用onEvent 目前为止最大时间戳 5000
调用onPeriodicEmit 生成watermark 1999
数据 6
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 3
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 13
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据:[5, 6, 3]
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
数据 10
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999

示例2:

/**
 * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 秒

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // 处理时间场景下不需要实现
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}

标记Watermark生成器

        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy
                // 自定义间歇性生成器
                .<Integer>forGenerator(ctx -> new MyWatermarkGenerator<>(3000L))
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                             // 将输入数字转时间戳,单位毫秒,当作数据的时间戳
                            System.out.println("数据 " + element);
                            return element * 1000L;
                        });
    public static class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {

        /**
         * 乱序等待时间
         * 允许的最大延迟时间 ms
         */
        private long maxOutOfOrderness;
        /**
         * 保存 当前为止 最大的事件时间
         */
        private long currentMaxTimestamp;

        public MyWatermarkGenerator(long maxOutOfOrderness) {
            this.maxOutOfOrderness = maxOutOfOrderness;
        }

        /**
         * 每条数据来,都会调用一次: 用来提取最大的事件时间,保存下来,并发送watermark
         *
         * @param event
         * @param eventTimestamp 提取到的数据的 事件时间
         * @param output
         */
        @Override
        public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
            System.out.println("调用onEvent  目前为止最大时间戳 " + currentMaxTimestamp + " 生成watermark " + (currentMaxTimestamp - maxOutOfOrderness - 1));
        }

        /**
         * 周期性调用: 不需要
         *
         * @param output
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {

        }
    }
数据 5
调用onEvent  目前为止最大时间戳 5000 生成watermark 1999
数据 6
调用onEvent  目前为止最大时间戳 6000 生成watermark 2999
数据 3
调用onEvent  目前为止最大时间戳 6000 生成watermark 2999
数据 13
调用onEvent  目前为止最大时间戳 13000 生成watermark 9999
窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据:[5, 6, 3]

Watermark策略与Kafka连接器

使用 Apache Kafka 连接器作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)

当使用Kafka数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式

在这种情况下,可以使用Flink中可识别Kafka分区的watermark生成机制。

使用此特性,将在Kafka消费端内部针对每个Kafka分区生成watermark,并且不同分区watermark的合并方式与在数据流shuffle时的合并方式相同。

注意:

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                // 指定kafka节点的地址和端口
                .setBootstrapServers("node01:9092,node02:9092,node03:9092")
                // 指定消费者组的id
                .setGroupId("flink_group")
                // 指定消费的 Topic
                .setTopics("flink_topic")
                // 指定反序列化器,反序列化value
                .setValueOnlyDeserializer(new SimpleStringSchema())
                // flink消费kafka的策略
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();


        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");

        DataStreamSink<String> kafka_source = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka_source").print("Kafka");

        stream.print("Kafka");

        env.execute();
    }

其他

处理空闲数据源

为了解决这个问题,可以使用WatermarkStrategy来检测空闲输入并将其标记为空闲状态。WatermarkStrategy为此提供了一个工具接口:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

并行度下的水位线传递

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 从socket接收数据流
        SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);

        // 将输入数据转换为Integer
        DataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));

        // 将数据合理地分发到不同的分区中
        DataStream<Integer> partitionCustom = dataStream.partitionCustom(new MyPartitioner(), value -> value);

        // 定义Watermark策略
        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy
                // 时间序列递增,没有等待时间,即当输入 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行
                .<Integer>forMonotonousTimestamps()
                // 将输入数字转时间戳,单位毫秒,当作数据的时间戳
                .withTimestampAssigner((r, ts) -> r * 1000L);

        //  指定 watermark策略
        SingleOutputStreamOperator<Integer> singleOutputStreamOperator = partitionCustom.assignTimestampsAndWatermarks(watermarkStrategy);

        // 分2组窗口 数据%分区数,分成两组: 奇数一组,偶数一组
        SingleOutputStreamOperator<String> process = singleOutputStreamOperator.keyBy(a -> a % 2)
                // 使用事件时间语义窗
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                            @Override
                            public void process(Integer key, Context context, Iterable<Integer> input, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = input.spliterator().estimateSize();
                                out.collect("分组 " + key + " 的窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());
                            }
                        }
                );

        process.print();
        env.execute();
    }

    public static class MyPartitioner implements Partitioner<Integer> {

        @Override
        public int partition(Integer key, int numPartitions) {
            if (key % 2 == 0) {
                // 将偶数分配到第一个分区
                return 0;
            } else {
                // 将奇数分配到第二个分区
                return 1;
            }
        }
    }

发送测试数据

> nc -lk 8888
1
3
5
7
9
11
13
15
17

此时,控制台不会有任何输出,原因如下:

因此,这里可以使用上面提到的处理空闲数据源,设置空闲等待即可解决

        // 定义Watermark策略
        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy
                // 升序的watermark,没有等待时间,即当输入 数字 达到 滚动处理时间窗口10s 就触发窗口执行
                .<Integer>forMonotonousTimestamps()
                // 指定时间戳分配器 从数据中提取
                .withTimestampAssigner((r, ts) -> r * 1000L)
                //空闲等待5s
                .withIdleness(Duration.ofSeconds(5));
2> 分组 1 的窗口在时间区间: 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生5条数据,具体数据:[1, 3, 5, 7, 9]

迟到数据的处理

1.推迟水印推进

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2.设置窗口延迟关闭

.window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3))

3.使用侧流接收迟到的数据

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(lateWS)

实现示例

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket接收数据流
        SingleOutputStreamOperator<String> source = env.socketTextStream("IP", 8888);

        // 将输入数据转换为Integer
        DataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));

        // 定义Watermark策略
        WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy
                .<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element * 1000L);

        //  指定 watermark策略
        SingleOutputStreamOperator<Integer> sensorDSwithWatermark = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);


        OutputTag<Integer> lateTag = new OutputTag<>("late-data", Types.POJO(Integer.class));

        SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor % 2)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 推迟2s关窗
                .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流
                .process(
                        new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {

                            @Override
                            public void process(Integer key, Context context, Iterable<Integer> input, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = input.spliterator().estimateSize();

                                out.collect("分组 " + key + " 的窗口在时间区间: " + windowStart + "-" + windowEnd + " 产生" + count + "条数据,具体数据:" + input.toString());
                            }
                        }
                );


        process.print();
        // 从主流获取侧输出流,打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");
        env.execute();
    }
举报

相关推荐

0 条评论