0
点赞
收藏
分享

微信扫一扫

Flink Table API及Flink SQL

做个橙梦 2022-04-06 阅读 140
flink

Flink Table API及Flink SQL

文章目录

相关博客:

Flink-Table API 和 Flink SQL简介 | 新老版本Flink批流处理对比 | 读取文件和Kafka消费数据 | API 和 SQL查询表

flink-Table&sql-碰到的几个问题记录

一、概述

  • Flink 对批处理和流处理,提供了统一的上层 API
  • Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
  • Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite

https://www.notion.so

导入依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

<!-- 写入文件以及jdbc依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.12</artifactId>
    <version>1.10.1</version>
    <scope>provided</scope>
</dependency>

简单使用代码:

package com.root.table;

import com.root.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @author Kewei
 * @Date 2022/3/7 16:31
 */

public class TableTest1_Example {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 1.创建一个Table环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2.读取文件,并格式化
        String path = "data/sensor.txt";
        DataStreamSource<String> inputStream = env.readTextFile(path);
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        // 3.创建Table
        Table dataTable = tableEnv.fromDataStream(dataStream);
        
        // 4.简单查询
        Table resultTable = dataTable.select("id, temperature")
                .where("id = 'sensor_1'");
        
        // 5. 创建临时视图
        tableEnv.createTemporaryView("sensor", dataTable);
        
        // 6.使用sql查询
        String sql = "select id, temperature from sensor where id = 'sensor_1'";
        Table resultSqlTable = tableEnv.sqlQuery(sql);
        
        // 7.Table转换为DataStream,并打印输出
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");

        env.execute();

    }
}

二、基本程序结构

Table API和SQL的程序结构,与流式处理的程序结构十分类似。

StreamTableEnvironment tableEnv = ... // 创建表的执行环境

// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");

// 注册一张表,用于把计算结果输出,和读取表相同,需要输出时,
// 需要resultTable.insertInto("outputTable");
tableEnv.connect(...).createTemporaryTable("outputTable");

// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);

// 通过SQL查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");

// 将结果表写入输出表中
result.insertInto("outputTable");

三、Table API批处理和流处理

新版本blink,真正把批处理、流处理都以DataStream实现

创建环境 样例代码

package com.root.table;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * @author Kewei
 * @Date 2022/3/7 16:47
 */

public class TableTest2_CommonAPI {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 基于老版本的planner的流处理
        EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
                .useOldPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);

        // 基于老版本的planner的批处理
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment oldBatchEnv = BatchTableEnvironment.create(batchEnv);

        // 基于Blink的流处理
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);

        // 基于Blink的批处理
        EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);

    }
}

3.1 表 Table

  • TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表
  • 表(Table)是由一个"标示符"(identifier)来指定的,由3部分组成:Catalog名、数据库(database)名和对象名
  • 表可以是常规的,也可以是虚拟的(视图,View)
  • 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从DataStream转换而来
  • 视图(View)可以从现有的表中创建,通常是table API或者SQL查询的一个结果集

3.2 创建表

TableEnvironment可以调用connect()方法,连接外部系统,并调用.createTemporaryTable()方法,在Catalog中注册表。

tableEnv
  .connect(...)    //    定义表的数据来源,和外部系统建立连接
  .withFormat(...)    //    定义数据格式化方法 new CSV()
  .withSchema(...)    //    定义表结构
  .createTemporaryTable("MyTable");    //    创建临时表

3.3 创建TableEnvironment

  • 创建表的执行环境,需要将flink流处理的执行环境传入

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

  • TableEnvironment是flink中集成Table API和SQL的核心概念,所有对表的操作都基于TableEnvironment

    • 注册Catalog
    • 在Catalog中注册表
    • 执行SQL查询
    • 注册用户自定义函数(UDF)
package com.root.table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

/**
 * @author Kewei
 * @Date 2022/3/7 17:15
 */

public class TableTest3_CommonAPI {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 创建Table环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 读取文件,作为数据源
        String path = "data/sensor.txt";
        tableEnv.connect(new FileSystem().path(path))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT())
                        .field("temp",DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");

        // 查询表
        Table inputTable = tableEnv.from("inputTable");

        // 查看表结构
        inputTable.printSchema();

        // 转换为DataStream,并打印输出
        tableEnv.toAppendStream(inputTable, Row.class).print();

        env.execute();
    }
}

3.4 表的查询

  • Table API是集成在Scala和Java语言内的查询API
  • Table API基于代表"表"的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果
  • 有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

测试代码

package com.root.table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

/**
 * @author Kewei
 * @Date 2022/3/7 17:15
 */

public class TableTest4_CommonAPI {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 创建Table环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 读取文件,作为数据源
        String path = "data/sensor.txt";
        tableEnv.connect(new FileSystem().path(path))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT())
                        .field("temp",DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");
        // 查询表
        Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();

        // 链式查询,并筛选
        Table resultTable = inputTable.select("id, temp")
                .filter("id === 'sensor_6'");

        // 分组,并聚合
        Table aggTable = inputTable.groupBy("id")
                .select("id, id.count as count, temp.avg as avgTemp");

        // 执行SQL
        tableEnv.sqlQuery("select id, temp from inputTable where id = 'sensor_6'");
        Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");

        // 转换为DataStream,注意聚合之后的Table,需要使用toRetractStream,来转换为DataStream
        // 并且数据中含有true和false,false表示上一条保存的记录被删除,true则是新加入的数据
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toRetractStream(aggTable, Row.class).print("agg");
        tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");

        env.execute();
    }
}

Flink的Table API在更新数据时,实际是先删除原本的数据,再添加新数据

3.5 Table更新模式

  • 对于流式查询,需要声明如何在表和外部连接器之间执行转换
  • 与外部系统交换的消息类型,由更新模式(Uadate Mode)指定
  • 追加(Append)模式
    • 表只做插入操作,和外部连接器只交换插入(Insert)消息
  • 撤回(Retract)模式
    • 表和外部连接器交换添加(Add)和撤回(Retract)消息
    • 插入操作(Insert)编码为Add消息;删除(Delete)编码为Retract消息;更新(Update)编码为上一条的Retract和下一条的Add消息
  • 更新插入(Upsert)模式
    • 更新和插入都被编码为Upsert消息;删除编码为Delete消息

3.6 将数据写到文件中

写入到文件有局限,只能是批处理,且只能是追加写,不能是更新式的随机写。

测试代码

package com.root.table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

/**
 * @author Kewei
 * @Date 2022/3/7 17:41
 */

public class TableTest5_FileOutput {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String path = "data/sensor.txt";

        tableEnv.connect(new FileSystem().path(path))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");

        Table resultTable = tableEnv.from("inputTable").select("id, timestamp");
        Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) cnt,avg(temp) avgTemp from inputTable group by id");

        String outputPath = "data/out.txt";
        tableEnv.connect(new FileSystem().path(outputPath))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT()))
                .createTemporaryTable("outputTable");
        resultTable.insertInto("outputTable");

        // 以下报错 AppendStreamTableSink requires that Table has only insert changes.
        // AppendStreamTableSink 需要表只能插入没有修改的数据(不能group by)
//        String outputPath2 = "data/out1.txt";
//        tableEnv.connect(new FileSystem().path(outputPath2))
//                .withFormat(new Csv())
//                .withSchema(new Schema()
//                        .field("id", DataTypes.STRING())
//                        .field("cnt",DataTypes.BIGINT())
//                        .field("avgTemp",DataTypes.DOUBLE()))
//                .createTemporaryTable("outputTable2");
//        sqlAggTable.insertInto("outputTable2");

        env.execute();
        
        // 旧版Flink可以用下面这条
        // env.execute();

        // 新版Flink需要用这条,上面那条会报错,报错如下
        // Exception in thread "main" java.lang.IllegalStateException:
        // No operators defined in streaming topology. Cannot execute.
        // tableEnv.execute("");

    }
}

注意输出到文件时,输出的目标文件不能存在!否则会报错。

3.7 读写Kafka

Kafka作为消息队列,和文件系统类似的,只能往里追加数据,不能修改数据。

tableEnv.connect(new Kafka()
                .version("universal")
                .topic("sensor")
                .property("zookeeper.connect", "localhost:2181")
                .property("bootstrap.servers", "localhost:9092")
        )
        .withFormat(new Csv())
        .withSchema(new Schema()
                .field("id", DataTypes.STRING())
                .field("timestamp", DataTypes.BIGINT())
                .field("temp", DataTypes.DOUBLE())
        )
        .createTemporaryTable("inputTable");

四、表和流的转换

4.1 将表Table转换成DataStream

  • 表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了

  • 将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型

  • 表作为流式查询的结果,是动态更新的

  • 转换有两种转换模式:追加(Appende)模式和撤回(Retract)模式

  • 追加模式

    • 用于表只会被插入(Insert)操作更改的场景
    DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable,Row.class);
    
  • 撤回模式

    • 用于任何场景。有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作。

    • 得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(Delete)

      更新数据,会先删除旧数据,再插入新数据

4.2 将DataStream转换为Table

  • 对于一个DataStream,可以直接转换成Table,进而方便地调用Table API做转换操作

    DataStream<SensorReading> dataStream = ...;
    Table sensorTable = tableEnv.fromDataStream(dataStream);
    
  • 默认转换后的Table schema和DataStream中的字段定义一一对应,也可以单独指定出来

    DataStream<SensorReading> dataStream = ...;
    Table sensorTable = tableEnv.fromDataStream(dataStream,
                                               "id, timestamp as ts, temperature");
    

4.3 创建临时视图**(Temporary View)**

  • 基于DataStream创建临时视图
tableEnv.createTemporaryView("sensorView",dataStream);
tableEnv.createTemporaryView("sensorView",
                            dataStream, "id, timestamp as ts, temperature");
  • 基于Table创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable);

五、查看执行计划

  • Table API 提供了一种机制来解释计算表的逻辑和优化查询计划
  • 查看执行计划,可以通过TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成,返回一个字符串,描述三个计划
    • 优化的逻辑查询计划
    • 优化后的逻辑查询计划
    • 实际执行计划
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

六、 流处理和关系代数的区别

相关博客:

Flink- 将表转换成DataStream | 查看执行计划 | 流处理和关系代数的区别 | 动态表 | 流式持续查询的过程 | 将流转换成动态表 | 持续查询 | 将动态表转换成 DS

Table API和SQL,本质上还是基于关系型表的操作方式;而关系型表、关系代数,以及SQL本身,一般是有界的,更适合批处理的场景。这就导致在进行流处理的过程中,理解会稍微复杂一些,需要引入一些特殊概念。

可以看到,其实关系代数(主要就是指关系型数据库中的表)和SQL,主要就是针对批处理的,这和流处理有天生的隔阂。

关系代数(表)/sql流处理
处理的数据对象字段元组的有界集合字段元组的无限序列
查询(Query)对数据的访问可以访问到完整的数据输入无法访问所有数据,必须持续"等待"流式输入
查询终止条件生成固定大小的结果集后终止永不停止,根据持续收到的数据不断更新查询结果

6.1 动态表(Dynamic Table)

我们可以随着新数据的到来,不停地在之前的基础上更新结果。这样得到的表,在Flink Table API概念里,就叫做“动态表”(Dynamic Tables)。

  • 动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念
  • 与表示批处理数据的静态表不同,动态表是随时间变化的
  • 持续查询(Continuous Query)
    • 动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
    • 连续查询永远不会终止,并会生成另一个动态表
    • 查询(Query)会不断更新其动态结果表,以反映其动态输入表上的更改。

6.2 动态表和持续查询

在这里插入图片描述

流式表查询的处理过程:

  1. 流被转换为动态表
  2. 对动态表计算连续查询,生成新的动态表
  3. 生成的动态表转换成流

6.3 将流转换为动态表

  • 为了处理带有关系查询的流,必须先将其转换为表
  • 从概念上讲,流的每个数据记录,都被解释为对结果表的插入(Insert)修改操作

本质上,我们其实是从一个、只有插入操作的changelog(更新日志)流,来构建一个表来一条数据插入一条数据

在这里插入图片描述

6.4 持续查询

  • 持续查询,会在动态表上做计算处理,并作为结果生成新的动态表。

    与批处理查询不同,连续查询从不终止,并根据输入表上的更新更新其结果表。

    在任何时间点,连续查询的结果在语义上,等同于在输入表的快照上,以批处理模式执行的同一查询的结果。

下图为一个点击事件流的持续查询,是一个分组聚合做count统计的查询。

在这里插入图片描述

6.5 将动态表转换成DataStream

  • 与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改
  • 将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码

有三种类型:

  • 仅追加(Append-only)流

    • 仅通过插入(Insert)更改来修改的动态表,可以直接转换为仅追加流
  • 撤回(Retract)流

    • 撤回流是包含两类消息的流:添加(Add)消息和撤回(Retract)消息

    动态表通过将INSERT 编码为add消息、DELETE 编码为retract消息、UPDATE编码为被更改行(前一行)的retract消息和更新后行(新行)的add消息,转换为retract流

  • Upsert(更新插入流)

    • Upsert流也包含两种类型的消息:Upsert消息和删除(Delete)消息

      通过将INSERT和UPDATE更改编码为upsert消息,将DELETE更改编码为DELETE消息,就可以将具有唯一键(Unique Key)的动态表转换为流。

6.6 将动态表转换成DataSream

在这里插入图片描述

举报

相关推荐

0 条评论