0
点赞
收藏
分享

微信扫一扫

大数据之flink中join用法

贵州谢高低 2022-02-25 阅读 75


1、将两个流中的数据进行join处理

package cn._51doit.flink.day05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TumblingWindowJoinDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//1000,A,1
DataStreamSource<String> leftLines = env.socketTextStream("localhost", 8888);
//2000,A,2
DataStreamSource<String> rightLines = env.socketTextStream("localhost", 9999);

//提取第一个流中数据的EventTime
DataStream<String> leftWaterMarkStream = leftLines
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//提取第二个流中数据的EventTime
DataStream<String> rightWaterMarkStream = rightLines
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//对第一个流整理成tuple3
DataStream<Tuple3<Long, String, String>> leftStream = leftWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//对第二个流整理成tuple3
DataStream<Tuple3<Long, String, String>> rightStream = rightWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//第一个流(左流)调用join方法关联第二个流(右流),并且在where方法和equalTo方法中分别指定两个流join的条件
DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftStream.join(rightStream)
.where(new KeySelector<Tuple3<Long, String, String>, String>() {
@Override
public String getKey(Tuple3<Long, String, String> value) throws Exception {
return value.f1; //将左流tuple3中的f1作为join的key
}
})
.equalTo(new KeySelector<Tuple3<Long, String, String>, String>() {
@Override
public String getKey(Tuple3<Long, String, String> value) throws Exception {
return value.f1; //将右流tuple3中的f1作为join的key
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5))) //划分EventTime滚动窗口,窗口长度为5秒
.apply(new MyInnerJoinFunction()); //在apply方法中传入自定义的MyInnerJoinFunction
joinedStream.print(); //调用print sink 输出结果
env.execute("TumblingWindowJoinDemo");
}

}
package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;

public class MyInnerJoinFunction implements JoinFunction<
Tuple3<Long, String, String>, //第一个数据流(左流)输入的数据类型
Tuple3<Long, String, String>, //第二个数据流(右流)输入的数据类型
Tuple6<Long, String, String, Long, String, String>> { //join后输出的数据类型
//第一个流和第二个流输入的数据在同一个时间窗口内并且join的key相同才会调用join方法
@Override
public Tuple6<Long, String, String, Long, String, String> join(
Tuple3<Long, String, String> left, //第一个数据流(左流)输入的一条数据
Tuple3<Long, String, String> right) //第二个数据流(右流)输入的一条数据
throws Exception {
//能join将两个流的数据放入tuple6中,并返回输出
return Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2);
}
}

2、左外连接

package cn._51doit.flink.day05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TumblingWindowLeftOuterJoinDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//1000,A,1
DataStreamSource<String> leftSteam = env.socketTextStream("localhost", 8888);
//2000,A,2
DataStreamSource<String> rightStream = env.socketTextStream("localhost", 9999);

//提取第一个流中数据的EventTime
DataStream<String> leftWaterMarkStream = leftSteam
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//提取第二个流中数据的EventTime
DataStream<String> rightWaterMarkStream = rightStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//对第一个流整理成tuple3
DataStream<Tuple3<Long, String, String>> leftTuple = leftWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//对第二个流整理成tuple3
DataStream<Tuple3<Long, String, String>> rightTuple = rightWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//第一个流(左流)和第二个流(右流)进行LeftOuterJoin
//在同一个窗口,并且join的条件相等,第一个流中的数据没join上也输出
DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftTuple.coGroup(rightTuple)
.where(new KeySelector<Tuple3<Long, String, String>, String>() {
@Override
public String getKey(Tuple3<Long, String, String> value) throws Exception {
return value.f1;
}
})
.equalTo(new KeySelector<Tuple3<Long, String, String>, String>() {
@Override
public String getKey(Tuple3<Long, String, String> value) throws Exception {
return value.f1;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new MyLeftOuterJoinFunction());

joinedStream.print();

env.execute();
}
}
package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.util.Collector;

public class MyLeftOuterJoinFunction implements CoGroupFunction<
Tuple3<Long, String, String>, //左流输入的数据类型
Tuple3<Long, String, String>, //右流输入的数据类型
Tuple6<Long, String, String, Long, String, String>> { //输出的数据类型

@Override
public void coGroup(Iterable<Tuple3<Long, String, String>> first,
Iterable<Tuple3<Long, String, String>> second,
Collector<Tuple6<Long, String, String, Long, String, String>> out) throws Exception {
//循环左流的数据,如果有数据说明触发窗口时左流中有数据
for (Tuple3<Long, String, String> left : first) {
boolean hasJoined = false;
//循环右流的数据,如果有数据说明触发窗口时右流中有数据,即join上流
for (Tuple3<Long, String, String> right : second) {
//返回两个流join上的数据
out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));
hasJoined = true;
}
//如果没有join上,只返回左流的数据
if (!hasJoined) {
out.collect(Tuple6.of(left.f0, left.f1, left.f2, null, null, null));
}
}
}
}

3、右外连接

package cn._51doit.flink.day05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TumblingWindowRightOuterJoinDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//1000,A,1
DataStreamSource<String> leftSteam = env.socketTextStream("localhost", 8888);
//2000,A,2
DataStreamSource<String> rightStream = env.socketTextStream("localhost", 9999);

//提取第一个流中数据的EventTime
DataStream<String> leftWaterMarkStream = leftSteam
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//提取第二个流中数据的EventTime
DataStream<String> rightWaterMarkStream = rightStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//对第一个流整理成tuple3
DataStream<Tuple3<Long, String, String>> leftTuple = leftWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//对第二个流整理成tuple3
DataStream<Tuple3<Long, String, String>> rightTuple = rightWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//调用coGroup实现left join
DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftTuple.coGroup(rightTuple)
.where(new KeySelector<Tuple3<Long, String, String>, String>() {
@Override
public String getKey(Tuple3<Long, String, String> value) throws Exception {
return value.f1;
}
})
.equalTo(new KeySelector<Tuple3<Long, String, String>, String>() {
@Override
public String getKey(Tuple3<Long, String, String> value) throws Exception {
return value.f1;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new MyRightOuterJoinFunction());

joinedStream.print();

env.execute();
}
}
package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.util.Collector;

public class MyRightOuterJoinFunction implements CoGroupFunction<
Tuple3<Long, String, String>, //左流输入的数据类型
Tuple3<Long, String, String>, //右流输入的数据类型
Tuple6<Long, String, String, Long, String, String>> { //输出的数据类型
@Override
public void coGroup(Iterable<Tuple3<Long, String, String>> first,
Iterable<Tuple3<Long, String, String>> second,
Collector<Tuple6<Long, String, String, Long, String, String>> out) throws Exception {
//循环右流的数据,如果有数据说明触发窗口时右流中有数据
for (Tuple3<Long, String, String> right : second) {
boolean hasJoined = false;
//循环左流的数据,如果有数据说明触发窗口时左流中有数据,即join上流
for (Tuple3<Long, String, String> left : first) {
//返回两个流join上的数据
out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));
hasJoined = true;
}
//如果没有join上,只返回右流的数据
if (!hasJoined) {
out.collect(Tuple6.of(null, null, null, right.f0, right.f1, right.f2));
}
}
}
}

4、interval Join

key相等,设置数据存活的范围

package cn._51doit.flink.day05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

public class IntervalJoinDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//1000,A,1
DataStreamSource<String> leftLines = env.socketTextStream("localhost", 8888);
//2000,A,2
DataStreamSource<String> rightLines = env.socketTextStream("localhost", 9999);

//提取第一个流中数据的EventTime
DataStream<String> leftWaterMarkStream = leftLines
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//提取第二个流中数据的EventTime
DataStream<String> rightWaterMarkStream = rightLines
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//对第一个流整理成tuple3
DataStream<Tuple3<Long, String, String>> leftStream = leftWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//对第二个流整理成tuple3
DataStream<Tuple3<Long, String, String>> rightStream = rightWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftStream
.keyBy(t -> t.f1) //指定第一个流分组KeySelector
.intervalJoin(rightStream.keyBy(t -> t.f1)) //调用intervalJoin方法并指定第二个流的分组KeySelector
.between(Time.seconds(-1), Time.seconds(1)) //设置join的时间区间范围为当前数据时间±1秒
.upperBoundExclusive() //默认join时间范围为前后都包括的闭区间,现在设置为前闭后开区间
.process(new MyProcessJoinFunction()); //调用process方法中传入自定义的MyProcessJoinFunction
joinedStream.print(); //调用print sink 输出结果
env.execute("IntervalJoinDemo");
}
}



举报

相关推荐

0 条评论