0
点赞
收藏
分享

微信扫一扫

Flink 广播流Broadcast State 模式

90哦吼 2022-02-18 阅读 79

广播流与普通流JOIN图解

user actions 可以看作是事件流

patterns 为广播流,把全量数据加载到不同的计算节点
在这里插入图片描述

普通双流join
根据join 条件,根据key的发到同一个计算节点,如下图类似

在这里插入图片描述

案例

package com.zxl.broadcasts;

import com.zxl.blink.DataDB;
import com.zxl.blink.Person;
import com.zxl.blink.StudentDB;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;


public class BroadcastDemo {
    public static void main(String[] args) throws Exception {
        //配置FLINK WEB UI 可以登入localhost:8848 查看flink运行图
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT,8848);
        //创建执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        environment.setParallelism(4);
        //调用自定义函数形成streamone
        DataStream<Tuple2<Integer, Long>> studentSource = environment.addSource(new StudentDB());
        //调用自定义函数形成streamtwo
        DataStream<Person> dataStream = environment.addSource(new DataDB());
        // TODO: 2022/2/17 首先使用id将流进行进行分区(keyBy),这能确保相同id的数据会流转到相同的物理机上。
        KeyedStream<Person, Tuple> pidStream= dataStream.keyBy("pid");
        // TODO: 2022/2/17  定义一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构 key的类型和value的类型
        MapStateDescriptor<Integer, Tuple2<Integer, Long>> student = new MapStateDescriptor<>(
                "student",
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.of(new TypeHint<Tuple2<Integer, Long>>() {
                })
        );
        // TODO: 2022/2/17   广播流,广播规则并且创建 broadcast state
        BroadcastStream<Tuple2<Integer, Long>> broadcast = studentSource.broadcast(student);
        // TODO: 2022/2/17 将两个流关联起来 完成匹配
        // // KeyedBroadcastProcessFunction 中的类型参数表示:
        //   1. key stream 中的 key 类型
        // ** KeyedStream泛型里面的第二个值是key,第一个值是元素类型
        //   2. 非广播流中的元素类型
        //   3. 广播流中的元素类型
        //   4. 结果的类型,在这里是 string
        DataStream<Tuple5<Integer, Long, Integer, String, String>> streamOperator = pidStream.connect(broadcast)
                .process(new KeyedBroadcastProcessFunction<Tuple, Person, Tuple2<Integer, Long>, Tuple5<Integer, Long, Integer, String, String>>() {

                    //定义map 状态来存储广播流数据
                    MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor;

                    //初始化state
                    //在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        mapStateDescriptor = new MapStateDescriptor<>("student", BasicTypeInfo.INT_TYPE_INFO, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {}));
                    }

                    // processBroadcastElement() 负责处理广播流的元素
                    @Override
                    public void processElement(Person person, ReadOnlyContext ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
                        // 处理每一个元素,看state是否有匹配的,有的话,下发到下一个节点
                        ReadOnlyBroadcastState<Integer, Tuple2<Integer, Long>> state = ctx.getBroadcastState(mapStateDescriptor);
                        if ((person.getPid() != null && state.get(person.getPid()) != null)) {
                            System.out.println("匹配到" + person.toString());
                            out.collect(new Tuple5<Integer, Long, Integer, String, String>(person.getPid(), state.get(person.getPid()).f1, person.getPage(), person.getPname(), person.getPsex()));
                        }
                    }

                    // processElement() 负责处理另一个流的元素
                    @Override
                    public void processBroadcastElement(Tuple2<Integer, Long> input, Context ctx, Collector<Tuple5<Integer, Long, Integer, String, String>> out) throws Exception {
                        // 新增加的广播元素,放入state中
                        //System.out.println("新增加需要监控的" + input.toString());
                        ctx.getBroadcastState(mapStateDescriptor).put(input.f0, input);
                    }
                });
        //打印结果
        streamOperator.print("broadcast");
        //执行程序
        environment.execute();
    }
}

在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。
也就是描述了用于存储规则的和初始化state用于存储广播变量的名称一定要相同,不然就会报错。

举报

相关推荐

0 条评论