Flink Sql join汇总总结
1 有界join
1.1 Window Join
window join就是将两条流划分出时间窗口,数据已经被划分为窗口,无界数据变为有界数据,就和离线批处理的方式一样了,两个窗口的数据简单的进行关联即可,窗口结束就把数据下发下去,关联到的数据就下发 [A, B],没有关联到的数据取决于是否是 outer join 然后进行数据下发。
1.1.1 DataStream API
flinkEnv.env()
// A 流
.addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
})
// B 流
.join(flinkEnv.env().addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
}))
// A 流的 keyby 条件
.where(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
})
// B 流的 keyby 条件
.equalTo(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
})
// 开窗口
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
// 窗口中关联到的数据的处理逻辑
.apply(new JoinFunction<Object, Object, Object>() {
@Override
public Object join(Object first, Object second) throws Exception {
return null;
}
});
上述解决方案只支持 inner join,即窗口内能关联到的才会下发,关联不到的则直接丢掉。
如果你想实现 window 上的 outer join,可以使用 coGroup 算子,案例如下:
public class CogroupFunctionDemo02 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
// A 流
DataStream<Tuple2<String,String>> input1=env.socketTextStream("",9002)
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});
// B 流
DataStream<Tuple2<String,String>> input2=env.socketTextStream("",9001)
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});
// A 流关联 B 流
input1.coGroup(input2)
// A 流的 keyby 条件
.where(new KeySelector<Tuple2<String,String>, Object>() {
@Override
public Object getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
}).equalTo(new KeySelector<Tuple2<String,String>, Object>() {
// B 流的 keyby 条件
@Override
public Object getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
})
// 窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
.apply(new CoGroupFunction<Tuple2<String,String>, Tuple2<String,String>, Object>() {
// 可以自定义实现 A 流和 B 流在关联不到时的输出数据格式
@Override
public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<Object> collector) throws Exception {
StringBuffer buffer=new StringBuffer();
buffer.append("DataStream frist:\n");
for(Tuple2<String,String> value:iterable){
buffer.append(value.f0+"=>"+value.f1+"\n");
}
buffer.append("DataStream second:\n");
for(Tuple2<String,String> value:iterable1){
buffer.append(value.f0+"=>"+value.f1+"\n");
}
collector.collect(buffer.toString());
}
}).print();
env.execute();
}
}
或者你还可以使用 connect 算子自定义各种关联操作(connect 算子相比 join、coGroup 算子灵活很多):
// (userEvent, userId)
KeyedStream<UserEvent, String> customerUserEventStream = env
.addSource(kafkaUserEventSource)
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor(Time.hours(24)))
.keyBy(new KeySelector<UserEvent, String>() {
@Override
public String getKey(UserEvent userEvent) throws Exception {
return userEvent.getUserId();
}
});
//customerUserEventStream.print();
final BroadcastStream<Config> configBroadcastStream = env
.addSource(kafkaConfigEventSource)
.broadcast(configStateDescriptor);
final FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<EvaluatedResult>(
params.get(OUTPUT_TOPIC),
new EvaluatedResultSerializationSchema(),
producerProps);
DataStream<EvaluatedResult> connectedStream = customerUserEventStream
.connect(configBroadcastStream)
.process(new ConnectedBroadcastProcessFuntion());
1.1.2 SQL
SELECT
L.num as L_Num
, L.id as L_Id
, R.num as R_Num
, R.id as R_Id
, L.window_start
, L.window_end
FROM (
SELECT *
FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L
FULL JOIN (
SELECT *
FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R
ON L.num = R.num
AND L.window_start = R.window_start
AND L.window_end = R.window_end;
1.1.3 window join总结
当我们的窗口大小划分的越细时,在窗口边缘关联不上的数据就会越多,数据质量就越差。窗口大小划分的越宽时,窗口内关联上的数据就会越多,数据质量越好,但是产出时效性就会越差。所以小伙伴萌在使用时要注意取舍。
举个例子:以曝光关联点击来说,如果我们划分的时间窗口为 1 分钟,那么一旦出现曝光在 0:59,点击在 1:01 的情况,就会关联不上,当我们的划分的时间窗口 1 小时时,只有在每个小时的边界处的数据才会出现关联不上的情况。
该种解决方案适用于可以评估出窗口内的关联率高的场景,如果窗口内关联率不高则不建议使用。
注意:这种方案由于上面说到的数据质量和时效性问题在实际生产环境中很少使用。
1.2 Interval Join
其也是将两条流的数据从无界数据变为有界数据,但是这里的有界和上节说到的 Flink Window Join 的有界的概念是不一样的,这里的有界是指两条流之间的有界。
以 A 流 join B 流举例,interval join 可以让 A 流可以关联 B 流一段时间区间内的数据,比如 A 流关联 B 流前后 5 分钟的数据。
数据已经被划分为窗口,无界数据变为有界数据,就和离线批处理的方式一样了,两个窗口的数据简单的进行关联即可。窗口结束(这里的窗口结束是指 interval 区间结束,区间的结束是利用 watermark 来判断的)就把数据下发下去,关联到的数据就下发 [A, B],没有关联到的数据取决于是否是 outer join 然后进行数据下发。
- 时间区间JOIN:让一条流去JOIN另一条流的前后一段时间内的数据,INTERVAL JOIN可以避免回撤流的产生,在某些场景下,下游输出系统不具备处理回撤流的能力,此时可以借助INTERVAL JOIN
- INNER INTERVAL JOIN:只有两条流 JOIN 到(满足ON中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
- LEFT INTERVAL JOIN:流任务中,左流数据到达之后,如果没有JOIN到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 JOIN 到,则会输出+[L, R]。事件时间中随着 Watermark 的推进, 如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出+[L, R],如果右流 State 中的数据过期了,就直接从 State 中删除
- RIGHT INTERVAL JOIN:处理逻辑和LEFT INTERVAL JOIN类似
- FULL INTERVAL JOIN:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出+[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, NULL],右流过期输出 -[NULL, R])
1.2.1 DataStream API
clickRecordStream
.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
// 定义 interval 的时间区间
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}
})
.print();
1.2.2 SQL
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time
1.2.3 Interval Join总结
interval join 的方案比 window join 方案在数据质量上好很多,但是其也是存在 join 不到的情况的。并且如果为 outer join 的话,outer 一测的流数据需要要等到区间结束才能下发。
该种解决方案适用于两条流之间可以明确评估出相互延迟的时间是多久的,这里我们可以使用离线数据进行评估,使用离线数据的两条流的时间戳做差得到一个分布区间。
比如在 A 流和 B 流时间戳相差在 1min 之内的有 95%,在 1-4 min 之内的有 4.5%,则我们就可以认为两条流数据时间相差在 4 min 之内的有 99.5%,这时我们将上下界设置为 4min 就是一个能保障 0.5% 误差的合理区间。
注意:这种方案在生产环境中还是比较常用的。
1.3 Temporary Join
首先介绍一个时态表的概念,这是一个随时间不断变化的动态表,它可能包含表的多个快照。对于时态表中的记录,可以追踪、访问其历史版本的表称为版本表,如数据库的 changeLog;只能追踪、访问最新版本的表称为普通表,如数据库的表。举个例子,外汇订单金额计算,要计算当时的汇率来汇总,这时汇率表用时态表就很合适。
1.3.1 DataStream API
import org.apache.flink.table.functions.TemporalTableFunction;
(...)
// 获取 stream 和 table 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 提供一个汇率历史记录表静态数据集
List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
ratesHistoryData.add(Tuple2.of("Euro", 114L));
ratesHistoryData.add(Tuple2.of("Yen", 1L));
ratesHistoryData.add(Tuple2.of("Euro", 116L));
ratesHistoryData.add(Tuple2.of("Euro", 119L));
// 用上面的数据集创建并注册一个示例表
// 在实际设置中,应使用自己的表替换它
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("r_currency"), $("r_rate"), $("r_proctime").proctime());
tEnv.createTemporaryView("RatesHistory", ratesHistory);
// 创建和注册时态表函数
// 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
tEnv.registerFunction("Rates", rates);
1.3.2 SQL
"CREATE TABLE dim_source (" +
" id STRING," +
" name STRING," +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time, " +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-kafka'," +
" 'topic' = 'flinksqldim'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'key.format' = 'json'," +
" 'value.format' = 'json')"
定义时态表 要求 (1)主键(2)WATERMARK
1.4 LoopUp Join
在关联维度表时。JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。
默认情况下,lookup cache 是未启用的,你可以设置 lookup.cache.max-rows and lookup.cache.ttl 参数来启用。
lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。
当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。
Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。
当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。
缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。
所以要做好吞吐量和正确性之间的平衡。
CREATE TEMPORARY TABLE mysql_behavior_conf (
id int
,code STRING
,map_val STRING
,update_time TIMESTAMP(3)
-- ,primary key (id) not enforced
-- ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://localhost:3306/venn'
,'table-name' = 'lookup_join_config'
,'username' = 'root'
,'password' = '123456'
,'lookup.cache.max-rows' = '1000'
,'lookup.cache.ttl' = '1 minute' -- 缓存时间,即使一直在访问也会删除
);
其实本质上跟使用异步IO加缓存实现的效果相同,lookup join底层也是使用guava 的 LocalCache做缓存
2 无界join
2.1 Regular Join
regular join 还是基于无界数据进行关联,以 A 流 left join B 流举例,A 流数据到来之后,直接去尝试关联 B 流数据。
- 如果关联到了则直接下发关联到的数据
- 如果没有关联到则也直接下发没有关联到的数据,后续 B 流中的数据到来之后,会把之前下发下去的没有关联到数据撤回,然后把关联到的数据数据进行下发。由此可以看出这是基于 Flink SQL 的 retract 机制,则也就说明了其目前只支持 Flink SQL。
两条流的数据会尝试关联,能关联到直接下发,关联不到先下发一个目前的结果数据。
2.1.1 SQL
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
-
实时REGULAR JOIN支持等值JOIN和不等值JOIN,等值JOIN SHUFFLE策略是HASH,非等值JOIN策略是GLOBAL,所有数据发往一个并发,按照非等值条件进行关联
-
REGULAR JOIN会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此需要为 State 配置合适的 TTL,以防止 State 过大
数据质量和时效性高的原因都是因为 regular join 会保障目前 Flink 任务已经接收到的数据中能关联的一定是关联上的,即使关联不上,数据也会下发,完完全全保障了当前数据的客观性和时效性。
2.1.2 Regular Join总结
该种解决方案虽然是目前在产出质量、时效性上最好的一种解决方案,但是在实际场景中使用时,也存在一些问题:
- 基于 retract 机制,所有的数据都会存储在 state 中以判断能否关联到,所以我们要设置合理的 state ttl 来避免大 state 问题导致的任务不稳定
- 基于 retract 机制,所以在数据发生更新时,会下发回撤数据、最新数据 2 条消息,当我们的关联层级越多,则下发消息量的也会放大,并且会出现数据回撤导致的udf失效 ,及去重问题。
- sink 组件要支持 retract,我们不要忘了最终数据是要提供数据服务给需求方进行使用的,所以我们最终写入的数据组件也需要支持 retract,比如 MySQL。如果写入的是 Kafka,则下游消费这个 Kafka 的引擎也需要支持回撤\更新机制。
3 join优化方案
但是我们可以发现,无论是哪一种 Join 方案,Join 的前提都是将 A 流和 B 流的数据先存储在状态中,然后再进行关联。
即在实际生产中使用时常常会碰到的问题就是:大状态的问题。
关于大状态问题业界常见两种解决思路:
- 减少状态大小:在 Flink Join 中的可以想到的优化措施就是减少 state key 的数量。在未优化之前 A 流和 B 流的数据往往是存储在单独的两个 State 实例中的,那么我们的优化思路就是将同 Key 的数据放在一起进行存储,一个 key 的数据只需要存储一份,减少了 key 的数量
- 转移状态至外存:大 State 会导致 Flink 任务不稳定,那么我们就将 State 存储在外存中,让 Flink 任务轻量化,比如将数据存储在 Redis 中,A 流和 B 流中相同 key 的数据共同维护在一个 Redis 的 hashmap 中,以供相互进行关联
3.1 key相同时共用state
将两条流的数据使用 union、connect 算子合并在一起,然后使用一个共享的 state 进行处理。
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(1);
flinkEnv.env()
.addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
})
.keyBy(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
})
.connect(flinkEnv.env().addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
}).keyBy(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
}))
// 左右两条流的数据
.process(new KeyedCoProcessFunction<Object, Object, Object, Object>() {
// 两条流的数据共享一个 mapstate 进行处理
private transient MapState<String, String> mapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, String>("a", String.class, String.class));
}
@Override
public void processElement1(Object value, Context ctx, Collector<Object> out) throws Exception {
}
@Override
public void processElement2(Object value, Context ctx, Collector<Object> out) throws Exception {
}
})
.print();
3.2 state过大优化
定期清理state,比如在曝光关联点击的情况下,如果我们能明确一次曝光只有一次点击的话,只要这条曝光或者点击被关联到过,那么我们就可以在 KeyedCoProcessFunction 中自定义逻辑将已经被关联过得曝光、点击的 state 数据进行删除,以减小 state,减轻任务压力。
3.3 使用外部存储保存state
外存 State 到 redis。
此种方案就是完全不使用 Flink 的 state,直接将来的数据存储到 Redis 中进行维护,A 流的数据过来之后,去 Redis 中找 B 流的数据,B 流的数据过来之后,去 Redis 中找 A 流的数据。
某些金融公司内的关联,state 是不能被清理的,比如存储了借款信息之后,这些信息后续还是可能被修改的。所以这种场景下需要存储全量的 state。
引用
https://www.cnblogs.com/baran/p/15950363.html
https://mp.weixin.qq.com/s/66FyBdXaPtAZHqRXrgPrjQ