1、需求:按照事件时间,key分区,创建滚动窗口求一段时间的和,之后再创建全局窗口,求和的最大值。
2、代码实现
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Demo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> line = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> mapStream = line.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(
new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String value, long l) {
return Long.parseLong(value.split(",")[0]);
}
}
)).map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] fields = s.split(",");
return new Tuple3<Long, String, Integer>(Long.parseLong(fields[0]), fields[1], Integer.parseInt(fields[2]));
}
});
//创建一个窗口,获取当前窗口内每个key的sum值
SingleOutputStreamOperator<Tuple4<Long, Long, String, Integer>> windowStream = mapStream.keyBy(e -> e.f1).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple3<Long, String, Integer>, Tuple4<Long, Long, String, Integer>, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple3<Long, String, Integer>, Tuple4<Long, Long, String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple3<Long, String, Integer>> elements, Collector<Tuple4<Long, Long, String, Integer>> out) throws Exception {
int sum = 0;
for (Tuple3<Long, String, Integer> element : elements) {
sum += element.f2;
}
out.collect(new Tuple4<Long, Long, String, Integer>(context.window().getStart(), context.window().getEnd(), s, sum));
}
});
windowStream.print("sum=>");
windowStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.max(3).print("max=>");
env.execute();
}
}
3、数据测试
/**
- 输入数据:
- 100000,a,1
- 100001,b,1
- 100002,c,1
- 100003,a,10
- 100004,b,5
- 100005,c,1
- 第一个窗口触发
- 105005,d,2
- 105006,a,10
- 105007,b,4
- 105008,c,3
- 120000,a,1
- 第二个窗口和max窗口触发
- 输出数据:
- sum=>:4> (100000,105000,c,2)
- sum=>:6> (100000,105000,a,11)
- sum=>:2> (100000,105000,b,6)
- sum=>:5> (105000,110000,d,2)
- sum=>:4> (105000,110000,c,3)
- sum=>:2> (105000,110000,b,4)
- sum=>:6> (105000,110000,a,10)
- max=>:4> (100000,105000,a,11)
*/