0
点赞
收藏
分享

微信扫一扫

Java 集合框架:ArrayList 的介绍、使用、原理与源码解析

大明宫 2024-06-17 阅读 6

一 动态表与连续查询

1.1 动态表

1.是flink的支持流数据Table API 和SQL的核心概念。动态表随时间的变化而变化

2.在流上面定义的表在内部是没有数据的

1.2 连续查询

1.永远不会停止,结果是一张动态表

 

 

二 Flink SQL

2.1 sql行

1.先启动启动flink集群

 2.进入sql命令行

 3.创建一张数据源来自于kafka的表

如果退出命令行界面,这个表也不存在了,因为这个表基于内存的

4.执行sql语句

这个结果也是一张动态表 

2.2打印结果模式

2.2.1 表格模式(table mode)默认

在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:

 这个就是表格模式

2.2.2 变更日志模式(changelog mode)

不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。开启命令:

 我在kafka生产端添加了一条理科六班的数据,他表的变化是先加后减然后再加,他就是属于update,有之前的数据更新

我又在kafka生产端添加了一条理科六班1的数据,这张动态表之前没有这个数据,所以他是insert

2.2.3 Tableau模式(tableau mode)

1.更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

2.命令

类似于日志一样,不会新开一个窗口,数据的添加还是跟变更日志模式一样 ,但是添加已有数据的方法是不一样的他是先减后加,他是Retract流,添加之前没有数据还是insert

2.3 连接器

2.3.1 kafka

1.导入依赖,放到flink的lib目录下

 1.kafka source

1.是无界流,将Kafka的数据->flink-sql

2.建表语句

 2.查询语句

2. kafka sink
非聚合结果

也叫将仅插入的结果

1.建表(非聚合结果的)语句

2.将非聚合查询结果插入的结果写入sink表

3.查看结果需要用到kafka的消费端,也可以使用sql

聚合结果

将更新更改查询结果

指定类型需要指定  'format' = 'canal-json'

1.建表语句

2.将更新更改查询结果写入kafka

3.查询结果

2.3.2 JDBC

1.整合

1.mysql source

1.有界流,将mysql的数据写进flink-sql中

2.字段名称和字段类型需要和数据库中保存一致

3.建表语句

2.mysql sink

1.将flink-sql中的一张表或者查询语句的结果存放在mysql中

2.mysql中表需要提前创建

3.需要增加主键约束,flink会通过主键更新数据

3.mysql中的表也是动态变化的,只要flink中的表变了

4.建表语句

5.查询语句

2.3.3 hdfs

1.hdfs source
有界流表

1.建表语句

无界表

1.他是定时监控一个文件夹下面的文件,所以是无界流

2.需要配置扫描文件夹的间隔时间

3.建表语句

2. hdfs sink
 非聚合结果

1.也叫将仅插入的结果

2.建表语句

3.插入语句

集合结果

1.将更新更改查询结果保存到hdfs中

2.指定类型  'format' = 'canal-json'

3.建表语句

4.查询语句

2.3.4 hbase

1.整合

2.hbase最好在存储数据的地方,因为他查询比较麻烦

3.先创建hbase表

4.创建hbase sink表

5.查看结果

在flink-sql查看:select * from students_hbase;

在hbase查看:scan 'students_flink'

2.3.5 datagen

1.用于生成测试数据,可以用于高性能测试,这个数据是随机生成的

2.建表语句

2.3.7 print

1.结果在task manager查看

2.建表语句

3.插入语句

4.结果

2.3.8 BlackHole

1.用于高性能测试

2.建表

3.插入数据

4.没有结果

2.4 处理模式

2.4.1流处理模式

设置参数,直接在flink-sql的客户端直接输入

流处理结果图

2.4.2 批处理模式

设置直接输入

批处理结果图

 

2.5 数据格式

2.5.1 csv

1.默认是以英文逗号为分隔符

2.数据中字段的顺序需要和建表语句字段的顺序保持一致 (顺序映射)

3.建表语句

2.5.2 json

1.flink表中的字段和类型需要和json中保持一致(同名映射)

2.建表

2.5.3 canal-json

1.用于保存更新更改的结果流(聚合计算的结果保存到其他位置)

2.例子

2.6 时间属性

2.6.1 处理时间

1.PROCTIME() 生成处理时间的函数

2.建表语句

2.查询结果

3. 实时统计每个单词最近5秒单词的数量

2.6.2 事件时间

1.建表语句

注意:这个时间字段一定是时间戳形式,且字段里面有时间的概念。

 2.7 sql语法

2.7.1 hints

1.动态表选择:可以在查询表的时候动态修改表的参数配置

2.查询语句

students动态表的参数

2.7.2 with

1.当有一段sql逻辑重复时,可以定义在with语句中,减少代码量

2.sql

 

 2.7.3 SELECT WHERE

1.简单的语句,不必多说

2.7.4 SELECT DISTINCT

对于流处理的问题
1、flink会将之前的数据保存在状态中,用于判断是否重复
2、如果表的数据量很大,随着时间的推移状态会越来越大,状态的数据时先保存在TM的内存中的,时间长了可能会出问题

注意:distinct后面加字段

2.7.5 窗口函数

1.滚动窗口函数

1.建表语句

2.添加数据

3.查询语句

其中TUMBLE:滚动窗口函数,在原表的基础上增加窗口开始时间,窗口结束时间,窗口时间,重新组成一张表

DESCRIPTOR(bidtime)里面传入的是事件时间,INTERVAL '10' MINUTES:窗口的大小

2.滑动窗口函数

1.建表语句

 PROCTIME() 生成处理时间的函数

2.添加数据

3.查询语句

HOP:滑动窗口函数,在原表的基础上增加窗口开始时间,窗口结束时间,窗口时间,重新组成一张表

DESCRIPTOR:里面是时间

INTERVAL '5' SECOND, INTERVAL '10' SECOND:每五秒计算窗口为10秒里的数据

3.会话窗口

1.查询语句

再5秒里面,窗口没有数据开始计算

 2.7.6 group by

1.分组聚合需要将之前的计算结果保存在状态中,
如果状态无限增长,会导致checkpoint时间拉长,如果checkpoint超时失败了,也会导致任务失败

2.需要在表的参数后面加一个参数 /*+ OPTIONS('fields.word.length'='7') */,可以将这个数字变小一点,状态会变小一点

2.7.7 over

1.sum开窗

1.只能做累加,不能做全局(要有order by)

2.只能按照时间字段升序

3.建表语句

4.插入语句

5.查询语句

 

2.max,min,avg,count
3. row_number

1.如果只是增加排名,只能按照时间字段升序排序

2.本来开窗的字段只能是时间字段,如果外面接一个子查询,那么就没有限制了

 2.7.8 order by

1.考虑计算代价,只能按照时间字段来进行升序(第一个字段必须是时间字段,后面可以是非时间字段)

2. 不考虑计算代价,加上子查询或者limit限制,那么可以是非时间字段

2.7.9 模式监测

1.建表语句

2.插入语句

3.查询语句

定义单个

定义多个,可以使用正则表达式

 注意他默认的是匹配策略是SKIP TO NEXT ROW,如果想要修改,直接加参数AFTER MATCH SKIP PAST LAST ROW -- 从当前匹配成功之后的下一行开始匹配

另一个例题 

2.7.10 join

1 Regular Joins

1.和hive sql中的join是一样的

inner join 内连接:两张表都有的数据

left join 左连接 :左表有数据显示,右表没有为null

right join 右连接:显示右表有的数据,左表没有的为null

full join:全连接:只显示左右表共有的数据

2. Interval Joins

1.在一段时间内关联,字段必须要有时间戳的时间字段,两张表都是流式的。

proctime这个就是时间字段

3. Temporal Joins

流表关联时态表

1.建表语句

2.常规关联查询

 只能取出最新的结果,因为表是动态的

 3.动态join

FOR SYSTEM_TIME AS OF a.order_time: 使用a表的时间到b表中查询对应时间段的数据

 

4. lookup join

1用于流表关联维度表
流表:动态表
维度表:不怎么变化的变,维度表的数据一般可以放在hdfs或者mysql

2.建表语句

 3.使用常规关联查询

如果mysql表里面的数据增加的话,刚刚好增加的部分数据能与流表数据关联,但是查询不到 

4.lookup join

使用这个的前提是流表得是时间字段,

 5.解决 每一次都需要查询数据库,性能会降低的方案

在创建维度表史加2个参数

2.8 整合hive

2.8.1 整合

1.整合

 2.8.2 hive catalog

1.catalog--->database--->table---->字段---->数据

catalog是数据库上面的一个概念,一个cataloglog中可以有多个database, catalog就是flink抽象的元数据层

2.default_catalog:是flink默认的元数据,将元数据保存在jobmanager的内存中

3.使用

2.8.3 hive functions 

 2.9 checkpoint

2.9.1 编写sql文件

1.vim word_count.sql

2.9.2第一次提交

2.9.3 任务失败或者重启

1.基于之前的checkpoint重启任务

2.在inert into 语句的前面增加

3.重新提交 

sql-client.sh -f word_count.sql 

2.10 多次使用同一张表

1.编写sql文件

2.将查询或者插入的语句放在 EXECUTE STATEMENT SET里面

2.11 反压

2.11.1 测试反压

2.11.2 解决反压

1.增加资源
2.预聚合
举报

相关推荐

0 条评论