一 动态表与连续查询
1.1 动态表
1.是flink的支持流数据Table API 和SQL的核心概念。动态表随时间的变化而变化
2.在流上面定义的表在内部是没有数据的
1.2 连续查询
1.永远不会停止,结果是一张动态表
二 Flink SQL
2.1 sql行
1.先启动启动flink集群
2.进入sql命令行
3.创建一张数据源来自于kafka的表
如果退出命令行界面,这个表也不存在了,因为这个表基于内存的
4.执行sql语句
这个结果也是一张动态表
2.2打印结果模式
2.2.1 表格模式(table mode)默认
在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
这个就是表格模式
2.2.2 变更日志模式(changelog mode)
不会实体化和可视化结果,而是由插入(+
)和撤销(-
)组成的持续查询产生结果流。开启命令:
我在kafka生产端添加了一条理科六班的数据,他表的变化是先加后减然后再加,他就是属于update,有之前的数据更新
我又在kafka生产端添加了一条理科六班1的数据,这张动态表之前没有这个数据,所以他是insert
2.2.3 Tableau模式(tableau mode)
1.更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type
):
2.命令
类似于日志一样,不会新开一个窗口,数据的添加还是跟变更日志模式一样 ,但是添加已有数据的方法是不一样的他是先减后加,他是Retract流,添加之前没有数据还是insert
2.3 连接器
2.3.1 kafka
1.导入依赖,放到flink的lib目录下
1.kafka source
1.是无界流,将Kafka的数据->flink-sql
2.建表语句
2.查询语句
2. kafka sink
非聚合结果
也叫将仅插入的结果
1.建表(非聚合结果的)语句
2.将非聚合查询结果插入的结果写入sink表
3.查看结果需要用到kafka的消费端,也可以使用sql
聚合结果
将更新更改查询结果
指定类型需要指定 'format' = 'canal-json'
1.建表语句
2.将更新更改查询结果写入kafka
3.查询结果
2.3.2 JDBC
1.整合
1.mysql source
1.有界流,将mysql的数据写进flink-sql中
2.字段名称和字段类型需要和数据库中保存一致
3.建表语句
2.mysql sink
1.将flink-sql中的一张表或者查询语句的结果存放在mysql中
2.mysql中表需要提前创建
3.需要增加主键约束,flink会通过主键更新数据
3.mysql中的表也是动态变化的,只要flink中的表变了
4.建表语句
5.查询语句
2.3.3 hdfs
1.hdfs source
有界流表
1.建表语句
无界表
1.他是定时监控一个文件夹下面的文件,所以是无界流
2.需要配置扫描文件夹的间隔时间
3.建表语句
2. hdfs sink
非聚合结果
1.也叫将仅插入的结果
2.建表语句
3.插入语句
集合结果
1.将更新更改查询结果保存到hdfs中
2.指定类型 'format' = 'canal-json'
3.建表语句
4.查询语句
2.3.4 hbase
1.整合
2.hbase最好在存储数据的地方,因为他查询比较麻烦
3.先创建hbase表
4.创建hbase sink表
5.查看结果
在flink-sql查看:select * from students_hbase;
在hbase查看:scan 'students_flink'
2.3.5 datagen
1.用于生成测试数据,可以用于高性能测试,这个数据是随机生成的
2.建表语句
2.3.7 print
1.结果在task manager查看
2.建表语句
3.插入语句
4.结果
2.3.8 BlackHole
1.用于高性能测试
2.建表
3.插入数据
4.没有结果
2.4 处理模式
2.4.1流处理模式
设置参数,直接在flink-sql的客户端直接输入
流处理结果图
2.4.2 批处理模式
设置直接输入
批处理结果图
2.5 数据格式
2.5.1 csv
1.默认是以英文逗号为分隔符
2.数据中字段的顺序需要和建表语句字段的顺序保持一致 (顺序映射)
3.建表语句
2.5.2 json
1.flink表中的字段和类型需要和json中保持一致(同名映射)
2.建表
2.5.3 canal-json
1.用于保存更新更改的结果流(聚合计算的结果保存到其他位置)
2.例子
2.6 时间属性
2.6.1 处理时间
1.PROCTIME() 生成处理时间的函数
2.建表语句
2.查询结果
3. 实时统计每个单词最近5秒单词的数量
2.6.2 事件时间
1.建表语句
注意:这个时间字段一定是时间戳形式,且字段里面有时间的概念。
2.7 sql语法
2.7.1 hints
1.动态表选择:可以在查询表的时候动态修改表的参数配置
2.查询语句
students动态表的参数
2.7.2 with
1.当有一段sql逻辑重复时,可以定义在with语句中,减少代码量
2.sql
2.7.3 SELECT WHERE
1.简单的语句,不必多说
2.7.4 SELECT DISTINCT
对于流处理的问题
1、flink会将之前的数据保存在状态中,用于判断是否重复
2、如果表的数据量很大,随着时间的推移状态会越来越大,状态的数据时先保存在TM的内存中的,时间长了可能会出问题
注意:distinct后面加字段
2.7.5 窗口函数
1.滚动窗口函数
1.建表语句
2.添加数据
3.查询语句
其中TUMBLE:滚动窗口函数,在原表的基础上增加窗口开始时间,窗口结束时间,窗口时间,重新组成一张表
DESCRIPTOR(bidtime)里面传入的是事件时间,INTERVAL '10' MINUTES:窗口的大小
2.滑动窗口函数
1.建表语句
PROCTIME() 生成处理时间的函数
2.添加数据
3.查询语句
HOP:滑动窗口函数,在原表的基础上增加窗口开始时间,窗口结束时间,窗口时间,重新组成一张表
DESCRIPTOR:里面是时间
INTERVAL '5' SECOND, INTERVAL '10' SECOND:每五秒计算窗口为10秒里的数据
3.会话窗口
1.查询语句
再5秒里面,窗口没有数据开始计算
2.7.6 group by
1.分组聚合需要将之前的计算结果保存在状态中,
如果状态无限增长,会导致checkpoint时间拉长,如果checkpoint超时失败了,也会导致任务失败
2.需要在表的参数后面加一个参数 /*+ OPTIONS('fields.word.length'='7') */,可以将这个数字变小一点,状态会变小一点
2.7.7 over
1.sum开窗
1.只能做累加,不能做全局(要有order by)
2.只能按照时间字段升序
3.建表语句
4.插入语句
5.查询语句
2.max,min,avg,count
3. row_number
1.如果只是增加排名,只能按照时间字段升序排序
2.本来开窗的字段只能是时间字段,如果外面接一个子查询,那么就没有限制了
2.7.8 order by
1.考虑计算代价,只能按照时间字段来进行升序(第一个字段必须是时间字段,后面可以是非时间字段)
2. 不考虑计算代价,加上子查询或者limit限制,那么可以是非时间字段
2.7.9 模式监测
1.建表语句
2.插入语句
3.查询语句
定义单个
定义多个,可以使用正则表达式
注意他默认的是匹配策略是SKIP TO NEXT ROW,如果想要修改,直接加参数AFTER MATCH SKIP PAST LAST ROW -- 从当前匹配成功之后的下一行开始匹配
另一个例题
2.7.10 join
1 Regular Joins
1.和hive sql中的join是一样的
inner join 内连接:两张表都有的数据
left join 左连接 :左表有数据显示,右表没有为null
right join 右连接:显示右表有的数据,左表没有的为null
full join:全连接:只显示左右表共有的数据
2. Interval Joins
1.在一段时间内关联,字段必须要有时间戳的时间字段,两张表都是流式的。
proctime这个就是时间字段
3. Temporal Joins
流表关联时态表
1.建表语句
2.常规关联查询
只能取出最新的结果,因为表是动态的
3.动态join
FOR SYSTEM_TIME AS OF a.order_time: 使用a表的时间到b表中查询对应时间段的数据
4. lookup join
1用于流表关联维度表
流表:动态表
维度表:不怎么变化的变,维度表的数据一般可以放在hdfs或者mysql
2.建表语句
3.使用常规关联查询
如果mysql表里面的数据增加的话,刚刚好增加的部分数据能与流表数据关联,但是查询不到
4.lookup join
使用这个的前提是流表得是时间字段,
5.解决 每一次都需要查询数据库,性能会降低的方案
在创建维度表史加2个参数
2.8 整合hive
2.8.1 整合
1.整合
2.8.2 hive catalog
1.catalog--->database--->table---->字段---->数据
catalog是数据库上面的一个概念,一个cataloglog中可以有多个database, catalog就是flink抽象的元数据层
2.default_catalog:是flink默认的元数据,将元数据保存在jobmanager的内存中
3.使用
2.8.3 hive functions
2.9 checkpoint
2.9.1 编写sql文件
1.vim word_count.sql
2.9.2第一次提交
2.9.3 任务失败或者重启
1.基于之前的checkpoint重启任务
2.在inert into 语句的前面增加
3.重新提交
sql-client.sh -f word_count.sql
2.10 多次使用同一张表
1.编写sql文件
2.将查询或者插入的语句放在 EXECUTE STATEMENT SET里面