0
点赞
收藏
分享

微信扫一扫

Flink双流connect的独特使用方式

荷一居茶生活 2022-03-11 阅读 62

业务需求:可动态更新本地内存缓存信息

因有一些配置信息原来是存在redis中的,但是发现flink集群与redis集群网络有较大延迟,这样导致flink处理速度上不去,有消费挤压与连接报错等问题。

解决方案:通过双流connect连接两个kafka信息,一个做主数据流处理,一个做缓存信息更新处理,通过processBroadcastElement的广播特性,将redis信息更新至内存中,实现快速访问。

代码:

//初始化主 kafka消费者
FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<KafkaMsgBo>(topicList, new KafkaMsgDeserializationSchema(),
        RentReductionConsumerConfig.getPropertiesByPara(***,****,***));

//初始化redis-kafka消费者
List<String> custPerConfigList = FlinkKafkaConsumerCommon.getTopicsList(***);
FlinkKafkaConsumer<KafkaMsgBo> custPerConfigConsumer = new FlinkKafkaConsumer(custPerConfigList,
        new KafkaMsgDeserializationSchema(),
        RentReductionConsumerConfig.getPropertiesByPara222(***,****,***));
//主kafka数据源
DataStream<KafkaMsgBo> stream = env.addSource(myConsumer);
//redis-kafka数据源
BroadcastStream<String> custPerConfigStream = env.addSource(custPerConfigConsumer).map(kafkaMsgBo -> {
    return new String(kafkaMsgBo.getMessage(), "UTF-8");
}).broadcast(configStateDescriptor);
// connect above 2 streams
DataStream<List<String>> connectedStream = filterStream
        .connect(custPerConfigStream)
        .process(new ConnectedBroadcastProcessFuntion());
ConnectedBroadcastProcessFuntion extends BroadcastProcessFunction后实现processBroadcastElement方案
//定义全局缓存变量
private CustomerPerceptionConfig customerPerceptionConfig;
//广播配置消息处理
@Override
public void processBroadcastElement(String jsonStr, Context context, Collector<List<String>> collector) throws Exception {
    //获取相应redis数据存入全局变量中
    initRedis();
}

此方案优缺点:

优点:解决频繁访问redis造成挤压、网络延迟等问题,提高flink实时校验处理速度。

缺点:本地内存大小有限制

举报

相关推荐

0 条评论