这篇文章主要来讲清 Watermark多并行 的执行机制,我们用代码及输入数据和输出数据来测试并验证。
实体代码
public class EventData{
private String id;
private Long eventTime;
private String data;
private Integer num;
public EventData(){
};
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getEventTime() {
return eventTime;
}
public void setEventTime(Long eventTime) {
this.eventTime = eventTime;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "EventData{" +
"id=" + id +
", eventTime=" + eventTime +
", data='" + data + '\'' +
", num=" + num +
'}';
}
public EventData(String id, Long eventTime, String data, Integer num) {
this.id = id;
this.eventTime = eventTime;
this.data = data;
this.num = num;
}
}
Watermark和KeyBy的关系
逻辑代码,并行度为 1
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
// 数据转换
DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
@Override
public EventData map(String value) throws Exception {
String[] strs = value.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
String.valueOf((Long.valueOf(strs[1])-5)),
Integer.valueOf(strs[3])
);
}
}).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<EventData>(Time.seconds(5)) {
//提取时间戳
@Override
public long extractTimestamp(EventData element) {
return element.getEventTime() * 1000L;
}
}
);
stream.print("WM: ");
// 基于事件时间的开窗聚合,统计15秒内数据的最小ID值
stream.keyBy(EventData::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
.sum("num")
.print("result: ");
env.execute("test");
}
数据
1a,1623051400,test data,1
2b,1623051402,test data,1
1a,1623051404,test data,2
2b,1623051405,test data,3
1a,1623051407,test data,6
3c,1623051417,test data,1
结果
WM: > EventData{id=1a, eventTime=1623051400, data='1623051395', num=1}
WM: > EventData{id=2b, eventTime=1623051402, data='1623051397', num=1}
WM: > EventData{id=1a, eventTime=1623051404, data='1623051399', num=2}
WM: > EventData{id=2b, eventTime=1623051405, data='1623051400', num=3}
WM: > EventData{id=1a, eventTime=1623051407, data='1623051402', num=6}
WM: > EventData{id=3c, eventTime=1623051417, data='1623051412', num=1}
result: > EventData{id=1a, eventTime=1623051400, data='1623051395', num=3}
result: > EventData{id=2b, eventTime=1623051402, data='1623051397', num=1}
数据里只有 id=1a 加上了相同 ID 的 num 值,但是 2b 却没有加,因为 1a 的事件时间开窗范围是 [390-405),2b 有两个开窗范围,分别是402对应[390-405)、405对应[405-420)。简单说,就是每个数据的事件时间到来都会去做一次桶(范围)的计算(包括是否需要新建一个桶),只有相同桶(范围)的在一起进行keyBy和sum。输出两条数据,是因为分区有两个。
Watermark 多并行
当并行度为2,的时候。
测试数据
6,1623051400,test data,1
6,1623051401,test data,1
6,1623051402,test data,1
6,1623051405,test data,3
6,1623051406,test data,3
6,1623051409,test data,3
6,1623051410,test data,5
结果
WM: :1> EventData{id=6, eventTime=1623051400, data='1623051395', num=1}
WM: :2> EventData{id=6, eventTime=1623051401, data='1623051396', num=1}
WM: :1> EventData{id=6, eventTime=1623051402, data='1623051397', num=1}
WM: :2> EventData{id=6, eventTime=1623051405, data='1623051400', num=3}
WM: :1> EventData{id=6, eventTime=1623051406, data='1623051401', num=3}
WM: :2> EventData{id=6, eventTime=1623051409, data='1623051404', num=3}
WM: :1> EventData{id=6, eventTime=1623051410, data='1623051405', num=5}
WM: :2> EventData{id=6, eventTime=1623051410, data='1623051405', num=5}
result: :1> EventData{id=6, eventTime=1623051400, data='1623051395', num=3}
在并行任务中,桶(范围)会存在多个,但每个并行任务共享一个最小(时间)的桶(范围),只有每个并行任务都达到或超过最小桶的水位线,窗口才会触发。
user1,2,3 就是并行任务,window1,2,3,4,5 都是桶,其中 window1是最小(时间)的桶,只有 user1,2,3 数据的事件时间都大于 window1 的 windows size,window1 才会触发计算。
数据倾斜
数据倾斜的造成是因为,我user1,user2,这两个一直都有数据的最新事件时间,但是user3 的事件时间一直在window1的范围,导致我 user1,user2 都到了 window5 的窗口生成了,而没办法结束 window1 窗口。