Flink Table API及Flink SQL
文章目录
相关博客:
Flink-Table API 和 Flink SQL简介 | 新老版本Flink批流处理对比 | 读取文件和Kafka消费数据 | API 和 SQL查询表
flink-Table&sql-碰到的几个问题记录
一、概述
- Flink 对批处理和流处理,提供了统一的上层 API
- Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
- Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite
https://www.notion.so
导入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- 写入文件以及jdbc依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
简单使用代码:
package com.root.table;
import com.root.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/7 16:31
*/
public class TableTest1_Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.创建一个Table环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2.读取文件,并格式化
String path = "data/sensor.txt";
DataStreamSource<String> inputStream = env.readTextFile(path);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3.创建Table
Table dataTable = tableEnv.fromDataStream(dataStream);
// 4.简单查询
Table resultTable = dataTable.select("id, temperature")
.where("id = 'sensor_1'");
// 5. 创建临时视图
tableEnv.createTemporaryView("sensor", dataTable);
// 6.使用sql查询
String sql = "select id, temperature from sensor where id = 'sensor_1'";
Table resultSqlTable = tableEnv.sqlQuery(sql);
// 7.Table转换为DataStream,并打印输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env.execute();
}
}
二、基本程序结构
Table API和SQL的程序结构,与流式处理的程序结构十分类似。
StreamTableEnvironment tableEnv = ... // 创建表的执行环境
// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");
// 注册一张表,用于把计算结果输出,和读取表相同,需要输出时,
// 需要resultTable.insertInto("outputTable");
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);
// 通过SQL查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");
// 将结果表写入输出表中
result.insertInto("outputTable");
三、Table API批处理和流处理
新版本blink,真正把批处理、流处理都以DataStream实现
创建环境 样例代码
package com.root.table;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
/**
* @author Kewei
* @Date 2022/3/7 16:47
*/
public class TableTest2_CommonAPI {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 基于老版本的planner的流处理
EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);
// 基于老版本的planner的批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchEnv = BatchTableEnvironment.create(batchEnv);
// 基于Blink的流处理
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);
// 基于Blink的批处理
EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);
}
}
3.1 表 Table
- TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表
- 表(Table)是由一个"标示符"(identifier)来指定的,由3部分组成:Catalog名、数据库(database)名和对象名
- 表可以是常规的,也可以是虚拟的(视图,View)
- 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从DataStream转换而来
- 视图(View)可以从现有的表中创建,通常是table API或者SQL查询的一个结果集
3.2 创建表
TableEnvironment可以调用connect()
方法,连接外部系统,并调用.createTemporaryTable()
方法,在Catalog中注册表。
tableEnv
.connect(...) // 定义表的数据来源,和外部系统建立连接
.withFormat(...) // 定义数据格式化方法 new CSV()
.withSchema(...) // 定义表结构
.createTemporaryTable("MyTable"); // 创建临时表
3.3 创建TableEnvironment
-
创建表的执行环境,需要将flink流处理的执行环境传入
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
TableEnvironment是flink中集成Table API和SQL的核心概念,所有对表的操作都基于TableEnvironment
- 注册Catalog
- 在Catalog中注册表
- 执行SQL查询
- 注册用户自定义函数(UDF)
package com.root.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/7 17:15
*/
public class TableTest3_CommonAPI {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建Table环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取文件,作为数据源
String path = "data/sensor.txt";
tableEnv.connect(new FileSystem().path(path))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temp",DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
// 查询表
Table inputTable = tableEnv.from("inputTable");
// 查看表结构
inputTable.printSchema();
// 转换为DataStream,并打印输出
tableEnv.toAppendStream(inputTable, Row.class).print();
env.execute();
}
}
3.4 表的查询
- Table API是集成在Scala和Java语言内的查询API
- Table API基于代表"表"的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果
- 有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构
测试代码
package com.root.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/7 17:15
*/
public class TableTest4_CommonAPI {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建Table环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取文件,作为数据源
String path = "data/sensor.txt";
tableEnv.connect(new FileSystem().path(path))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temp",DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
// 查询表
Table inputTable = tableEnv.from("inputTable");
// inputTable.printSchema();
// tableEnv.toAppendStream(inputTable, Row.class).print();
// 链式查询,并筛选
Table resultTable = inputTable.select("id, temp")
.filter("id === 'sensor_6'");
// 分组,并聚合
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 执行SQL
tableEnv.sqlQuery("select id, temp from inputTable where id = 'sensor_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");
// 转换为DataStream,注意聚合之后的Table,需要使用toRetractStream,来转换为DataStream
// 并且数据中含有true和false,false表示上一条保存的记录被删除,true则是新加入的数据
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toRetractStream(aggTable, Row.class).print("agg");
tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");
env.execute();
}
}
Flink的Table API在更新数据时,实际是先删除原本的数据,再添加新数据
3.5 Table更新模式
- 对于流式查询,需要声明如何在表和外部连接器之间执行转换
- 与外部系统交换的消息类型,由更新模式(Uadate Mode)指定
- 追加(Append)模式
- 表只做插入操作,和外部连接器只交换插入(Insert)消息
- 撤回(Retract)模式
- 表和外部连接器交换添加(Add)和撤回(Retract)消息
- 插入操作(Insert)编码为Add消息;删除(Delete)编码为Retract消息;更新(Update)编码为上一条的Retract和下一条的Add消息
- 更新插入(Upsert)模式
- 更新和插入都被编码为Upsert消息;删除编码为Delete消息
3.6 将数据写到文件中
写入到文件有局限,只能是批处理,且只能是追加写,不能是更新式的随机写。
测试代码
package com.root.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
/**
* @author Kewei
* @Date 2022/3/7 17:41
*/
public class TableTest5_FileOutput {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String path = "data/sensor.txt";
tableEnv.connect(new FileSystem().path(path))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
Table resultTable = tableEnv.from("inputTable").select("id, timestamp");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) cnt,avg(temp) avgTemp from inputTable group by id");
String outputPath = "data/out.txt";
tableEnv.connect(new FileSystem().path(outputPath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT()))
.createTemporaryTable("outputTable");
resultTable.insertInto("outputTable");
// 以下报错 AppendStreamTableSink requires that Table has only insert changes.
// AppendStreamTableSink 需要表只能插入没有修改的数据(不能group by)
// String outputPath2 = "data/out1.txt";
// tableEnv.connect(new FileSystem().path(outputPath2))
// .withFormat(new Csv())
// .withSchema(new Schema()
// .field("id", DataTypes.STRING())
// .field("cnt",DataTypes.BIGINT())
// .field("avgTemp",DataTypes.DOUBLE()))
// .createTemporaryTable("outputTable2");
// sqlAggTable.insertInto("outputTable2");
env.execute();
// 旧版Flink可以用下面这条
// env.execute();
// 新版Flink需要用这条,上面那条会报错,报错如下
// Exception in thread "main" java.lang.IllegalStateException:
// No operators defined in streaming topology. Cannot execute.
// tableEnv.execute("");
}
}
注意输出到文件时,输出的目标文件不能存在!否则会报错。
3.7 读写Kafka
Kafka作为消息队列,和文件系统类似的,只能往里追加数据,不能修改数据。
tableEnv.connect(new Kafka()
.version("universal")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
四、表和流的转换
4.1 将表Table转换成DataStream
-
表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了
-
将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型
-
表作为流式查询的结果,是动态更新的
-
转换有两种转换模式:追加(Appende)模式和撤回(Retract)模式
-
追加模式
- 用于表只会被插入(Insert)操作更改的场景
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable,Row.class);
-
撤回模式
-
用于任何场景。有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作。
-
得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(Delete)。
更新数据,会先删除旧数据,再插入新数据
-
4.2 将DataStream转换为Table
-
对于一个DataStream,可以直接转换成Table,进而方便地调用Table API做转换操作
DataStream<SensorReading> dataStream = ...; Table sensorTable = tableEnv.fromDataStream(dataStream);
-
默认转换后的Table schema和DataStream中的字段定义一一对应,也可以单独指定出来
DataStream<SensorReading> dataStream = ...; Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");
4.3 创建临时视图**(Temporary View)**
- 基于DataStream创建临时视图
tableEnv.createTemporaryView("sensorView",dataStream);
tableEnv.createTemporaryView("sensorView",
dataStream, "id, timestamp as ts, temperature");
- 基于Table创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable);
五、查看执行计划
- Table API 提供了一种机制来解释计算表的逻辑和优化查询计划
- 查看执行计划,可以通过
TableEnvironment.explain(table)
方法或TableEnvironment.explain()
方法完成,返回一个字符串,描述三个计划- 优化的逻辑查询计划
- 优化后的逻辑查询计划
- 实际执行计划
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);
六、 流处理和关系代数的区别
相关博客:
Flink- 将表转换成DataStream | 查看执行计划 | 流处理和关系代数的区别 | 动态表 | 流式持续查询的过程 | 将流转换成动态表 | 持续查询 | 将动态表转换成 DS
Table API和SQL,本质上还是基于关系型表的操作方式;而关系型表、关系代数,以及SQL本身,一般是有界的,更适合批处理的场景。这就导致在进行流处理的过程中,理解会稍微复杂一些,需要引入一些特殊概念。
可以看到,其实关系代数(主要就是指关系型数据库中的表)和SQL,主要就是针对批处理的,这和流处理有天生的隔阂。
关系代数(表)/sql | 流处理 | |
---|---|---|
处理的数据对象 | 字段元组的有界集合 | 字段元组的无限序列 |
查询(Query)对数据的访问 | 可以访问到完整的数据输入 | 无法访问所有数据,必须持续"等待"流式输入 |
查询终止条件 | 生成固定大小的结果集后终止 | 永不停止,根据持续收到的数据不断更新查询结果 |
6.1 动态表(Dynamic Table)
我们可以随着新数据的到来,不停地在之前的基础上更新结果。这样得到的表,在Flink Table API概念里,就叫做“动态表”(Dynamic Tables)。
- 动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念
- 与表示批处理数据的静态表不同,动态表是随时间变化的
- 持续查询(Continuous Query)
- 动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
- 连续查询永远不会终止,并会生成另一个动态表
- 查询(Query)会不断更新其动态结果表,以反映其动态输入表上的更改。
6.2 动态表和持续查询
流式表查询的处理过程:
- 流被转换为动态表
- 对动态表计算连续查询,生成新的动态表
- 生成的动态表转换成流
6.3 将流转换为动态表
- 为了处理带有关系查询的流,必须先将其转换为表
- 从概念上讲,流的每个数据记录,都被解释为对结果表的插入(Insert)修改操作
本质上,我们其实是从一个、只有插入操作的changelog(更新日志)流,来构建一个表来一条数据插入一条数据
6.4 持续查询
-
持续查询,会在动态表上做计算处理,并作为结果生成新的动态表。
与批处理查询不同,连续查询从不终止,并根据输入表上的更新更新其结果表。
在任何时间点,连续查询的结果在语义上,等同于在输入表的快照上,以批处理模式执行的同一查询的结果。
下图为一个点击事件流的持续查询,是一个分组聚合做count统计的查询。
6.5 将动态表转换成DataStream
- 与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改
- 将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码
有三种类型:
-
仅追加(Append-only)流
- 仅通过插入(Insert)更改来修改的动态表,可以直接转换为仅追加流
-
撤回(Retract)流
- 撤回流是包含两类消息的流:添加(Add)消息和撤回(Retract)消息
动态表通过将INSERT 编码为add消息、DELETE 编码为retract消息、UPDATE编码为被更改行(前一行)的retract消息和更新后行(新行)的add消息,转换为retract流。
-
Upsert(更新插入流)
-
Upsert流也包含两种类型的消息:Upsert消息和删除(Delete)消息
通过将INSERT和UPDATE更改编码为upsert消息,将DELETE更改编码为DELETE消息,就可以将具有唯一键(Unique Key)的动态表转换为流。
-