0
点赞
收藏
分享

微信扫一扫

【深度好文】Flink SQL流批⼀体化技术详解(二)

持续输出 敬请关注
大数据架构  湖仓一体化  流批一体 离线+实时数仓 
各种大数据解决方案  各种大数据新技术实践
持续输出  敬请关注

【深度好文】Flink SQL流批⼀体化技术详解(一)_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/124347634?spm=1001.2014.3001.5501【珍藏版】数仓平台、推荐系统架构选型及解决⽅案_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/124042191?spm=1001.2014.3001.5501【珍藏版】⼤数据中台架构及解决⽅案_大数据研习社的博客-CSDN博客_数据中台产品架构持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/123692199?spm=1001.2014.3001.5501新⼀代USDP开源套件,可替代CDH的免费大数据套件平台及架构选型_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/123525688?spm=1001.2014.3001.5501⼤数据平台基础架构及解决⽅案_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践https://blog.csdn.net/dajiangtai007/article/details/123473705?spm=1001.2014.3001.5501

第2章 Flink Table编程

2.1 依赖管理

2.2 Flink Tabl API基本代码结构

2.2.1 混⽤DataStream和Table API

2.2.2 标准结构

2.3  创建TableEnvironment

2.3.1 TableEnvironment的功能

2.3.2 两种TableEnvironment

2.3.3 修改配置

2.4 创建表和注册表

2.4.1 创建表的两种⽅式

2.4.2 临时表与持久表

2.4.3 表的标识符

【下一篇】Flink SQL流批⼀体化技术详解(三)

第2章 Flink Table编程

2.1 依赖管理

普通maven项⽬,引⼊如下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.djt</groupId>
<artifactId>flinksql06</artifactId>
<version>1.0-SNAPSHOT</version>
<name>flinksql06</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url><properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.14.3</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.javafaker/javafaker --><dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.15</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults
(may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/mavencore/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see
https://maven.apache.org/ref/current/maven-core/defaultbindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/mavencore/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

(1)flink-table-planner: planner 计划器,是 table API 最主要的部分,提供了运⾏时环境和⽣成程序执⾏计划的planner;
(2)flink-table-api-java-bridge: bridge 桥接器,主要负责 table API 和 DataStream/DataSet API 的连接⽀持,按照语⾔分 java 和 scala。
(3)这⾥的两个依赖,是 IDE 环境下运⾏需要添加的;如果是⽣产环境, lib⽬录下默认已经有了 planner,就只需要有 bridge 就可以了
(4)如果想使⽤⽤户⾃定义函数,或是跟 kafka 做连接,需要有⼀个 SQL client,这个包含在 flink-table-common⾥。

2.2 Flink Tabl API基本代码结构

2.2.1 混⽤DataStreamTable API

        熟悉DataStream的同学,习惯混⽤DataStream和Table API,下⾯是⼀个例⼦:

//1、获取Stream执⾏环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执⾏环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据
DataStream<ClickLogs> clickLogs = env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"
).map(event -> {
String[] props = event.split(",");
return ClickLogs
.builder()
.user(props[0])
.url(props[1])
.cTime(props[2])
.build();
});
//4、流转换为动态表
Table table = tEnv.fromDataStream(clickLogs);
//5、执⾏Table API查询/SQL查询
Table resultTable = table.where($("user").isEqual("Mary"))
.select($("user"), $("url"), $("cTime"));
//6、将Table转换为DataStream
DataStream<ClickLogs> selectedClickLogs = tEnv.toDataStream(resultTable,
ClickLogs.class);
//7、处理结果:打印/输出
selectedClickLogs.print();
//8、执⾏
env.execute("FlinkTableFirstExample");

2.2.2 标准结构

        下⾯是⼀个标准的Table API的例⼦,你可以忘掉DataStream:

//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()//Flink1.14开始就删除了其他的执⾏器了,只保留了BlinkPlanner
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table: 1)读取外部表; 2)从Table API或者SQL查询结果创建表
Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "zhangsan"),
row(2L, "lisi")
).select($("id"), $("name"));
//注册表到catalog(可选的)
tEnv.createTemporaryView("sourceTable", projTable);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("id", DataTypes.DECIMAL(10, 2))
.column("name", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、 Table API执⾏查询(可以执⾏多次查询,中间表可以注册到catalog也可以不注册)
Table resultTable = tEnv.from("sourceTable").select($("id"), $("name"));
// Table resultTable = projTable.select($("id"), $("name"));
//5、输出(包括执⾏,不需要单独在调⽤tEnv.execute("job"))
resultTable.executeInsert("sinkTable");

2.3  创建TableEnvironment

        TableEnvironment是Flink集成Table API/SQL的⼊⼝,有了它才可以执⾏Table API/SQL相关的操作

2.3.1 TableEnvironment的功能

         TableEnvironment主要的功能如下:
(1)注册catalog(可以理解为数据系统实例,⽐如某个HBase集群,某个MySQL服务器)
(2)在catalog注册库和表
(3)加载插件模块
(4)执⾏SQL查询
(5)注册UDF
(6)DataStream和Table互转(仅仅在StreamExecutionEnvironment下)

2.3.2 两种TableEnvironment

1、 TableEnvironment
       标准的做法是创建TableEnvironment,在创建 TableEnv 的时候,可以传⼊⼀个 EnvironmentSettings 或者TableConfig 参数, ⽤来配置 TableEnvironment 的⼀些特性:

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()//Flink1.14开始就删除了其他的执⾏器了,只保留了BlinkPlanner
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);

 注意: Flink1.14开始删除了其他的执⾏器了,只保留了BlinkPlanner
2、 StreamExecutionEnvironment
       如果要混⽤DataStream和Table API/SQL,可以使⽤StreamTableEnvironment

//1、获取Stream执⾏环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执⾏环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

TableEnvironment和StreamExecutionEnvironment⼆选⼀即可。

2.3.3 修改配置

// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");

具体配置项请参考如下链接:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/

2.4 创建表和注册表

        有了TableEnvironment,我们接下来就需要创建表并注册表,注册表是可选的

2.4.1 创建表的两种⽅式

        创建表是为了在表上执⾏TableAPI或者SQL查询,可以通过如下两种⽅式创建表:

 (1)虚拟的表(VIEWS)

Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "zhangsan"),
row(2L, "lisi")
).select($("id"), $("name"));
//createTemporaryView
tEnv.createTemporaryView("sourceTable", projTable);

(2)常规的表(TABLES)

final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build();
tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
// 也可使⽤SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")

具体的Connector可以参考如下链接,后⾯会讲:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/

2.4.2 临时表与持久表

        注册表是为了把计算的中间结果注册为⼀个表,供同⼀Flink session后续访问或者跨Flink session访问。只有常规的表(TABLES)才分临时表和持久表, 虚拟的表(VIEWS)都是临时表

final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build();
//持久表
tableEnv.createTable("SourceTableA", sourceDescriptor);
//临时表
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor)

 2.4.3 表的标识符

        注册表时可以指定唯⼀标识符,格式为:

Catalog(⽬录).database(数据库).objectname(表名)

        如果没有指定Catalog或database,就使⽤当前默认值,即tEnv.useCatalog("...")和tEnv.useDatabase("...")指定的默认值,如果tEnv没有指定,则默认值default_catalog.default_database

TableEnvironment tEnv = ...;
//当前Flink会话切换到custom_catalog.custom_database
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
//在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("exampleView", table);
//在custom_catalog.other_database下,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("other_database.exampleView", table);
//在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView.View的视图表,注意转义字
符
tableEnv.createTemporaryView("`example.View`", table);
//other_catalog.other_database,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

【下一篇】Flink SQL流批⼀体化技术详解(三)

举报

相关推荐

0 条评论