0
点赞
收藏
分享

微信扫一扫

Flink-1.12(七) Watermark多并行,Watermark和KeyBy的关系,以及数据倾斜

静鸡鸡的JC 2021-09-21 阅读 44

这篇文章主要来讲清 Watermark多并行 的执行机制,我们用代码及输入数据和输出数据来测试并验证。
实体代码

public class EventData{

    private String id;
    private Long eventTime;
    private String data;
    private Integer num;

    public EventData(){

    };

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getEventTime() {
        return eventTime;
    }

    public void setEventTime(Long eventTime) {
        this.eventTime = eventTime;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "EventData{" +
                "id=" + id +
                ", eventTime=" + eventTime +
                ", data='" + data + '\'' +
                ", num=" + num +
                '}';
    }

    public EventData(String id, Long eventTime, String data, Integer num) {
        this.id = id;
        this.eventTime = eventTime;
        this.data = data;
        this.num = num;
    }
}
Watermark和KeyBy的关系

逻辑代码,并行度为 1

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
        // 数据转换
        DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
            @Override
            public EventData map(String value) throws Exception {
                String[] strs = value.split(",");
                return new EventData(
                        strs[0],
                        Long.valueOf(strs[1]),
                        String.valueOf((Long.valueOf(strs[1])-5)),
                        Integer.valueOf(strs[3])
                );
            }
        }).assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<EventData>(Time.seconds(5)) {
                //提取时间戳
                @Override
                public long extractTimestamp(EventData element) {
                    return element.getEventTime() * 1000L;
                }
            }
        );
        stream.print("WM: ");
        // 基于事件时间的开窗聚合,统计15秒内数据的最小ID值
        stream.keyBy(EventData::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(15)))
                .sum("num")
                .print("result: ");
        env.execute("test");
    }

数据

1a,1623051400,test data,1
2b,1623051402,test data,1
1a,1623051404,test data,2
2b,1623051405,test data,3
1a,1623051407,test data,6
3c,1623051417,test data,1

结果

WM: > EventData{id=1a, eventTime=1623051400, data='1623051395', num=1}
WM: > EventData{id=2b, eventTime=1623051402, data='1623051397', num=1}
WM: > EventData{id=1a, eventTime=1623051404, data='1623051399', num=2}
WM: > EventData{id=2b, eventTime=1623051405, data='1623051400', num=3}
WM: > EventData{id=1a, eventTime=1623051407, data='1623051402', num=6}
WM: > EventData{id=3c, eventTime=1623051417, data='1623051412', num=1}
result: > EventData{id=1a, eventTime=1623051400, data='1623051395', num=3}
result: > EventData{id=2b, eventTime=1623051402, data='1623051397', num=1}

数据里只有 id=1a 加上了相同 ID 的 num 值,但是 2b 却没有加,因为 1a 的事件时间开窗范围是 [390-405),2b 有两个开窗范围,分别是402对应[390-405)、405对应[405-420)。简单说,就是每个数据的事件时间到来都会去做一次桶(范围)的计算(包括是否需要新建一个桶),只有相同桶(范围)的在一起进行keyBy和sum。输出两条数据,是因为分区有两个。

Watermark 多并行

当并行度为2,的时候。
测试数据

6,1623051400,test data,1
6,1623051401,test data,1
6,1623051402,test data,1
6,1623051405,test data,3
6,1623051406,test data,3
6,1623051409,test data,3
6,1623051410,test data,5

结果

WM: :1> EventData{id=6, eventTime=1623051400, data='1623051395', num=1}
WM: :2> EventData{id=6, eventTime=1623051401, data='1623051396', num=1}
WM: :1> EventData{id=6, eventTime=1623051402, data='1623051397', num=1}
WM: :2> EventData{id=6, eventTime=1623051405, data='1623051400', num=3}
WM: :1> EventData{id=6, eventTime=1623051406, data='1623051401', num=3}
WM: :2> EventData{id=6, eventTime=1623051409, data='1623051404', num=3}
WM: :1> EventData{id=6, eventTime=1623051410, data='1623051405', num=5}
WM: :2> EventData{id=6, eventTime=1623051410, data='1623051405', num=5}
result: :1> EventData{id=6, eventTime=1623051400, data='1623051395', num=3}

在并行任务中,桶(范围)会存在多个,但每个并行任务共享一个最小(时间)的桶(范围),只有每个并行任务都达到或超过最小桶的水位线,窗口才会触发。

user1,2,3 就是并行任务,window1,2,3,4,5 都是桶,其中 window1是最小(时间)的桶,只有 user1,2,3 数据的事件时间都大于 window1 的 windows size,window1 才会触发计算。

数据倾斜

数据倾斜的造成是因为,我user1,user2,这两个一直都有数据的最新事件时间,但是user3 的事件时间一直在window1的范围,导致我 user1,user2 都到了 window5 的窗口生成了,而没办法结束 window1 窗口。

举报

相关推荐

0 条评论