0
点赞
收藏
分享

微信扫一扫

Flink(七):Session Windows例子

白衣蓝剑冰魄 2022-03-12 阅读 49
flink

一、简介

      会话窗口按活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时,即当出现不活动间隙时,会话窗口将关闭。会话窗口可以配置有会话间隙功能,该功能定义不活动的时间长度。当此期限到期时,当前会话关闭,后续元素被分配到新的会话窗口。实现对应的例子

  Flink(四) :窗口简介_在前进的路上-CSDN博客

二、例子

    


import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;

public class WindowsSessionTest {

    public static void main(String[] args) {
    try {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties producerProperties = new Properties();
        producerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        producerProperties.setProperty("group.id", "test-2");

        producerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        producerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("test_window", new SimpleStringSchema(), producerProperties);

        //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
        consumer.setStartFromGroupOffsets();
//添加kafka数据源
        DataStreamSource<String> sourceStream = env.addSource(consumer);
       
        DataStream<Map<String, Object>> dataStream = sourceStream.map(new MapFunction<String, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(String value) throws Exception {
                System.out.println(value);
                return (Map)JSON.parse(value);
            }
        }) .assignTimestampsAndWatermarks(
                WatermarkStrategy.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Map<String, Object>>() {
                            @Override
                            public long extractTimestamp(Map<String, Object> map, long recordTimestamp) {
                                return Long.parseLong(map.get("time").toString());
                            }
                        })
        );
        //指定key
        DataStream windowDataStream = dataStream.keyBy(new KeySelector<Map<String, Object>, Tuple1<String>>() {
            @Override
            public Tuple1<String> getKey(Map<String, Object> value) throws Exception {

                return Tuple1.of(value.get("key").toString());
            }
        })
                .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.process(new ProcessWindowFunction<Map<String, Object>, Tuple4<String, Integer, Long, Long>, Tuple1<String>, TimeWindow>() {
                    @Override
                    public void process(Tuple1<String> stringTuple1, Context context, Iterable<Map<String, Object>> elements, Collector<Tuple4<String, Integer, Long, Long>> out) throws Exception {
                        Tuple4 tp4 = new Tuple4();
                          int count=0;
                        for (Map<String, Object> map : elements) {
                            count=count+1;
                            tp4.f0 = map.get("key").toString();
                        }
                          tp4.f1 = count;
                        tp4.f2 = context.window().getStart();
                        tp4.f3 = context.window().getEnd();
                        out.collect(tp4);
                    }
                });

        windowDataStream.print();
        env.execute("test_windows");
    }catch (Exception e){

        e.printStackTrace();
    }
    }

}

说明

 1、设置水位线

   WatermarkStrategy<Map<String,Object>>forBoundedOutOfOrderness(Duration.ofSeconds(2)) 

2、EventTimeSessionWindows窗口时间5秒

    EventTimeSessionWindows.of(Time.seconds(5))

三、结果

发送数据:

{"key":"001","time":1642263313000}
{"key":"001","time":1642263316000}
{"key":"001","time":1642263320000}
{"key":"001","time":1642263325000}
{"key":"001","time":1642263331000}
{"key":"001","time":1642263338000}
结果说明

  1> (001,4,1642263313000,1642263330000)
 
  1> (001,1,1642263331000,1642263336000)
举报

相关推荐

0 条评论