1、增量聚合:窗口内每进入一条数据就计算一次
案列:
package Flink_Window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
//flink原生支持的无界和操作
public class SocketInfiniteWindow2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\n");
DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split=s.split("\\W+");
for(String word:split){
collector.collect(Tuple2.of(word,1));
}
}
});
DataStream<Tuple2<String,Integer>> result=windowCounts
.keyBy(0)
.timeWindow(Time.seconds(100))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {
System.out.print("算子:"+Thread.currentThread().getId()+"集合的元素是:"+t0+"\t"+t1);
return Tuple2.of(t0.f0,t0.f1+t1.f1);
}
});
result.print();
env.execute("SocketInfiniteWindow2");
}
}
2、全量聚合
案列:
package Flink_Window;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
//flink原生支持的无界和操作
public class SocketInfiniteWindow2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource=env.socketTextStream("192.168.208.112",8821,"\n");
DataStream<Tuple2<String,Integer>> windowCounts = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split=s.split("\\W+");
for(String word:split){
collector.collect(Tuple2.of(word,1));
}
}
});
//增量聚合
// DataStream<Tuple2<String,Integer>> result=windowCounts
// .keyBy(0)
// .timeWindow(Time.seconds(100))
// .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
// @Override
// public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {
// System.out.print("算子:"+Thread.currentThread().getId()+"集合的元素是:"+t0+"\t"+t1);
// return Tuple2.of(t0.f0,t0.f1+t1.f1);
// }
// });
// result.print();
//全量聚合最重要的俩个类(ProcessFunction类和Context类)
DataStream<Tuple2<String,Integer>> result=windowCounts
.keyBy(0)
.timeWindow(Time.seconds(100))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context, Iterable<Tuple2<String, Integer>> v2s, Collector<Tuple2<String, Integer>> collector) throws Exception {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.print("------------");
System.out.print("算子是:"+Thread.currentThread().getId()+"窗口范围:"+sdf.format(context.window().getStart())+"\t"+sdf.format(context.window().getEnd()));
System.out.print("算子是:"+Thread.currentThread().getId()+"窗口范围:"+context.window().getStart()+"\t"+context.window().getEnd());
int sum=0;
for(Tuple2<String, Integer> v2:v2s){
sum +=1;
}
collector.collect(Tuple2.of(key.getField(0),sum));
}
});
result.print();
env.execute("SocketInfiniteWindow2");
}
}
全量聚合最重要的俩个类(ProcessFunction类和Context类)