广播流与普通流JOIN图解
user actions 可以看作是事件流
patterns 为广播流,把全量数据加载到不同的计算节点
普通双流join
根据join 条件,根据key的发到同一个计算节点,如下图类似
案例
package com.zxl.broadcasts;
import com.zxl.blink.DataDB;
import com.zxl.blink.Person;
import com.zxl.blink.StudentDB;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class BroadcastDemo {
public static void main(String[] args) throws Exception {
//配置FLINK WEB UI 可以登入localhost:8848 查看flink运行图
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT,8848);
//创建执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
environment.setParallelism(4);
//调用自定义函数形成streamone
DataStream<Tuple2<Integer, Long>> studentSource = environment.addSource(new StudentDB());
//调用自定义函数形成streamtwo
DataStream<Person> dataStream = environment.addSource(new DataDB());
// TODO: 2022/2/17 首先使用id将流进行进行分区(keyBy),这能确保相同id的数据会流转到相同的物理机上。
KeyedStream<Person, Tuple> pidStream= dataStream.keyBy("pid");
// TODO: 2022/2/17 定义一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构 key的类型和value的类型
MapStateDescriptor<Integer, Tuple2<Integer, Long>> student = new MapStateDescriptor<>(
"student",
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.of(new TypeHint<Tuple2<Integer, Long>>() {
})
);
// TODO: 2022/2/17 广播流,广播规则并且创建 broadcast state
BroadcastStream<Tuple2<Integer, Long>> broadcast = studentSource.broadcast(student);
// TODO: 2022/2/17 将两个流关联起来 完成匹配
// // KeyedBroadcastProcessFunction 中的类型参数表示:
// 1. key stream 中的 key 类型
// ** KeyedStream泛型里面的第二个值是key,第一个值是元素类型
// 2. 非广播流中的元素类型
// 3. 广播流中的元素类型
// 4. 结果的类型,在这里是 string
DataStream<Tuple5<Integer, Long, Integer, String, String>> streamOperator = pidStream.connect(broadcast)
.process(new KeyedBroadcastProcessFunction<Tuple, Person, Tuple2<Integer, Long>, Tuple5<Integer, Long, Integer, String, String>>() {
//定义map 状态来存储广播流数据
MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor;
//初始化state
//在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
mapStateDescriptor = new MapStateDescriptor<>("student", BasicTypeInfo.INT_TYPE_INFO, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {}));
}
// processBroadcastElement() 负责处理广播流的元素
@Override
public void processElement(Person person, ReadOnlyContext ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
// 处理每一个元素,看state是否有匹配的,有的话,下发到下一个节点
ReadOnlyBroadcastState<Integer, Tuple2<Integer, Long>> state = ctx.getBroadcastState(mapStateDescriptor);
if ((person.getPid() != null && state.get(person.getPid()) != null)) {
System.out.println("匹配到" + person.toString());
out.collect(new Tuple5<Integer, Long, Integer, String, String>(person.getPid(), state.get(person.getPid()).f1, person.getPage(), person.getPname(), person.getPsex()));
}
}
// processElement() 负责处理另一个流的元素
@Override
public void processBroadcastElement(Tuple2<Integer, Long> input, Context ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
// 新增加的广播元素,放入state中
//System.out.println("新增加需要监控的" + input.toString());
ctx.getBroadcastState(mapStateDescriptor).put(input.f0, input);
}
});
//打印结果
streamOperator.print("broadcast");
//执行程序
environment.execute();
}
}
在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
也就是描述了用于存储规则的和初始化state用于存储广播变量的名称一定要相同,不然就会报错。