0
点赞
收藏
分享

微信扫一扫

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量-Broadcast Variable


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、广播变量Broadcast Variables示例
  • 1、介绍
  • 2、广播变量示例
  • 3、验证
  • 三、Broadcast State 与 Broadcast Variable 区别


本文简单的介绍了flink中关于广播变量的简单使用示例。

一、maven依赖

为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖

下文中具体需要的依赖将在介绍时添加新增的依赖。

二、广播变量Broadcast Variables示例

1、介绍

可以将数据广播到TaskManager上就可以供TaskManager中的SubTask/task去使用,数据存储到内存中。这样可以减少大量的shuffle操作,而不需要多次传递给集群节点。比如在数据join阶段,可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降。

广播变量允许您使数据集可用于操作的所有并行实例,以及操作的常规输入。这对于辅助数据集或依赖数据的参数化非常有用。然后,操作员可以将数据集作为集合进行访问。

  • 广播:通过withBroadcastSet(DataSet,String)按名称注册广播集,以及
  • Access:可通过目标运算符处的getRuntimeContext().getBroadcastVariable(String)进行访问。

图示广播的工作方式

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量-Broadcast Variable_flink hive


官方示例

// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcast DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

在注册和访问广播数据集时,请确保名称(上一示例中的broadcastSetName)匹配。

由于广播变量的内容保存在每个节点的内存中,因此不应变得太大。对于标量值等更简单的事情,您可以简单地将参数作为函数闭包的一部分,或者使用withParameters(…)方法传入配置。

2、广播变量示例

本示例实现上一个缓存示例一样的内容,不过是使用广播实现的。
该示例比较简单,实现逻辑与分布式缓存基本上一样。

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;

/**
 * @author alanchan
 *
 */
public class TestBroadcastVariablesDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// Source
		// student数据集(学号,姓名)
		DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(Arrays.asList(Tuple2.of(1, "alan"), Tuple2.of(2, "alanchan"), Tuple2.of(3, "alanchanchn")));

		// score数据集(学号,学科,成绩)
		DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
				Arrays.asList(Tuple3.of(1, "chinese", 50), Tuple3.of(1, "math", 90), Tuple3.of(1, "english", 90), Tuple3.of(2, "math", 70), Tuple3.of(3, "art", 86)));

		// Transformation
		// 将studentDS(学号,姓名)集合广播出去(广播到各个TaskManager内存中)
		// 然后使用scoreDS(学号,学科,成绩)和广播数据studentDS(学号,姓名)进行关联,得到这样格式的数据:(学号,姓名,学科,成绩)
		MapOperator<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>> result = scoreDS
				.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple4<Integer, String, String, Integer>>() {

					Map<Integer, String> studentsMap = new HashMap<>();

					@Override
					public void open(Configuration parameters) throws Exception {
						// 获取广播数据
						List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentsInfo");
						for (Tuple2<Integer, String> tuple : studentList) {
							studentsMap.put(tuple.f0, tuple.f1);
						}

					}

					@Override
					public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
						// 使用广播数据
						Integer stuId = value.f0;
						String stuName = studentsMap.getOrDefault(stuId, "");

						return new Tuple4(stuId, stuName, value.f1, value.f2);
					}
				}).withBroadcastSet(studentDS, "studentsInfo");
		
		// 4.Sink
		result.print();

	}

}

3、验证

启动程序,运行程序,控制台输出如下:

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(7) - 广播变量-Broadcast Variable_flink kafka_02

三、Broadcast State 与 Broadcast Variable 区别

关于Broadcast State的介绍,请参考文章:53、Flink 的Broadcast State 模式介绍及示例

Broadcast State 和 Broadcast Variable 都是 Flink 中用于广播数据的机制,但它们之间有一些区别:

  • Broadcast State 是 KeyedStateBackend 的一个实现,它允许将状态数据广播到所有并行任务中。每个并行任务都可以访问相同的状态数据,从而实现状态的共享。Broadcast State 主要用于处理键控的状态,即状态与某个键相关联。
  • Broadcast Variable 是一种简单的广播机制,它可以将任意类型的数据广播到所有并行任务中。每个并行任务都可以访问相同的广播变量值。Broadcast Variable 主要用于处理非键控的数据,即不需要与特定键关联的数据。

总结一下,Broadcast State 和 Broadcast Variable 的主要区别在于:

  • Broadcast State 用于广播键控的状态数据,而 Broadcast Variable 用于广播非键控的数据。
  • Broadcast State 需要与 KeyedStream 一起使用,而 Broadcast Variable 可以与任何类型的 DataStream 一起使用。

以上,本文简单的介绍了flink中关于广播变量的简单使用示例。


举报

相关推荐

0 条评论