0
点赞
收藏
分享

微信扫一扫

VMware安装Centos 6.5系统

Flink学习笔记

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

四、FlinkSQL 中 Catalog 操作

1. 初始化操作

// 1.实例化目录
HiveCatalog catalog = new HiveCatalog(
	catalogName,                   // catalog name
	"default",                // default database
	"src/main/resources",  // Hive config (hive-site.xml) directory
	"2.1.1"                   // Hive version
);

// 2.注册目录
tableEnv.registerCatalog(catalogName, catalog);
        
// 3.使用目录
tableEnv.useCatalog(catalogName);

2. 数据库操作

// 1. 创建数据库
catalog.createDatabase(
    databaseName,
    new CatalogDatabaseImpl(
        new HashMap<>(), 
        "my comment"), 
    	true
	);

// 2. 删除数据库
catalog.dropDatabase(databaseName, true);

// 3. 检验数据库
catalog.databaseExists(databaseName)
    
// 4. 罗列数据库
catalog.listDatabases();

3. 数据表操作

// 1.创建表
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// 2.删除表
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// 3.修改表
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// 4.重命名表
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// 5.获得表
catalog.getTable("mytable");

// 6.判断表是否存在
catalog.tableExists("mytable");

// 7.返回数据库所有表的列表
catalog.listTables("mydb");

4. 视图操作

// 1.创建视图
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// 2.删除视图
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// 3.修改视图
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// 4.重命名视图
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);

// 5.获得视图
catalog.getTable("myview");

// 6.检查视图是否存在
catalog.tableExists("mytable");

// 7.获得数据库所有视图
catalog.listViews("mydb");

5. 分区操作

// 1.创建分区
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// 2.删除分区
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// 3.修改分区
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// 4.获得分区
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// 5.检查分区是否存在
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// 6.返回表所有的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// 7.列出给定分区规范下表的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// 8.按表达式筛选器列出表的分区
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

6. 函数操作

// 1.创建函数
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// 2.删除函数
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// 3.修改函数
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// 4.获得函数
catalog.getFunction("myfunc");

// 5.检查函数是否存在
catalog.functionExists("myfunc");

// 6.列出数据库中的函数
catalog.listFunctions("mydb");

五、FlinkSQL 流处理

1. 时间特性

1.1 处理时间(Processing Time)

简介:机器在本地生成的时间,不需要提取时间戳,也不需要水印!

1.1.1 在建表的 DDL 中指定

例子:文件系统建表

package cn.itcast.day01.time;

/**
 * @author lql
 * @time 2024-03-15 21:14:58
 * @description TODO
 */

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

/**
 * 在flinksql中使用处理时间
 * 在创建表的DDL中指定处理时间
 */
public class ProcessingTimeTableDDL {
    public static void main(String[] args) throws Exception {
        // todo 0)设置当前hadoop操作的用户名
        System.setProperty("HADOOP_USER_NAME", "root");
        
        // todo 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bbSetting = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bbSetting);
        
        // todo 2) 指定并行度
        env.setParallelism(1);
        
        // todo 3) 读取文件数据
        String path = ProcessingTimeTableDDL.class.getClassLoader().getResource("order.csv").getPath();
        
        // todo 4)创建HiveCatalog
        String catalogName = "myHive";
        String databaseName = "itcast_flinksql";
        HiveCatalog catalog = new HiveCatalog(
                catalogName, //指定catalog的名字
                "default", //默认数据库的名字
                "src/main/resources", //指定hive-site.xml文件的路径
                "2.1.1" //指定hive的版本
        );
        // todo 5)注册目录
        System.out.println("===========注册目录==================");
        tabEnv.registerCatalog(catalogName, catalog);

        // todo 6)切换目录
        System.out.println("===========切换目录==================");
        tabEnv.useCatalog(catalogName);

        // todo 7)创建数据库
        System.out.println("===========创建数据库==================");
        String createDBSql = "CREATE DATABASE IF NOT EXISTS "+catalogName+"."+databaseName;
        tabEnv.executeSql(createDBSql);

        // todo 8)切换数据库
        System.out.println("===========切换数据库==================");
        tabEnv.useDatabase(databaseName);

        //todo 9)根据文件路径创建表
        String sqlDDL =  "create table InputTable (" +
                "                  `id` varchar," +
                "                  `timestamp` bigint," +
                "                  `money` double," +
                "                  `category` varchar," +
                "                  `pt` AS PROCTIME()" +
                "                  ) with (" +
                "                  'connector' = 'filesystem'," +
                "                  'path' = 'file:///"+path+"'," +
                "                  'format' = 'csv'" +
                "                  )";

        tabEnv.executeSql(sqlDDL);
        Table resultTable = tabEnv.sqlQuery("select * from InputTable ");
        resultTable.printSchema();
        // 打印输出
        tabEnv.toAppendStream(resultTable, Row.class).print("result");
        env.execute();
    }
}

总结:在建表的时候增加一列:pt AS PROCTIME(),注意 timestamp 需要为 BIGINT 类型!


1.1.2 DataStream 转为 Table 时指定

例子:文件流转化为表

package cn.itcast.day01.time;

/**
 * @author lql
 * @time 2023-06-28 22:10:38
 * @description TODO
 */
import cn.itcast.day01.example.DataStreamToTable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 在flinksql中使用处理时间
 * 在DataStream转化成Table时候指定处理时间
 */
public class ProcessingTimeDataStream {
    public static void main(String[] args) throws Exception {
        //todo 1)创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);

        //todo 2)设置并行度
        env.setParallelism(1);

        //todo 3)从文件中读取数据
        String path = DataStreamToTable.class.getClassLoader().getResource("order.csv").getPath();
        DataStreamSource<String> inputDataStream = env.readTextFile(path);
        //inputDataStream.print();

        //todo 4)将读取的字符串数据转换成pojo
        SingleOutputStreamOperator<OrderBean> orderDataStream = inputDataStream.map(new MapFunction<String, OrderBean>() {
            @Override
            public OrderBean map(String value) throws Exception {
                String[] dataArray = value.split(",");
                return new OrderBean(dataArray[0], dataArray[1], Double.parseDouble(dataArray[2]), dataArray[3]);
            }
        });

        //todo 4)基于tableEnv,将流转换成表
        //The proctime attribute 'timestamp' must not replace an existing field.
        Table table = tabEnv.fromDataStream(orderDataStream, $("id"), $("timestamp"), $("money"), $("category"),$("pt").proctime());
        table.printSchema();

        //todo 6)对table对象使用table api编程的方式进行数据的查询操作
        Table tableResult = table
                .select($("id"), $("timestamp"), $("money"), $("category"))
                .filter($("category").isEqual("电脑"));


        //todo 7)对table对象使用sql编程的方式进行数据的查询操作
        //7.1:将table对象注册为一张表或者视图
        tabEnv.createTemporaryView("orderTable", table);
        //7.2:对表的数据进行操作
        Table sqlResult = tabEnv.sqlQuery("select id,`timestamp`,money,category from orderTable where category='电脑'");

        //todo 8)将table对象的数据进行输出
        //如果将table表对象的数据进行打印输出,但是table是不存在print方法的,因此需要将table再次转回dataStream才可以进行输出打印
        tabEnv.toAppendStream(sqlResult, Row.class).print("SQL API>>>");

        //todo 10)运行作业
        env.execute();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class OrderBean{
        private String id;
        private String timestamp;
        private Double money;
        private String category;
    }
}

总结:转化为表指定字段时,添加一列$("pt").proctime()

Table table = tabEnv.fromDataStream(orderDataStream, $("id"), $("timestamp"), $("money"), $("category"),$("pt").proctime());

1.2 事件时间(Event Time)
1.2.1 在建表的 DDL 中指定

例子:文件系统建表

package cn.itcast.day02.time;

/**
 * @author lql
 * @time 2024-03-15 12:31:52
 * @description TODO
 */

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 基于事件时间使用时间属性
 * 创建表的时候指定事件时间
 */
public class EventTimeTableDDL {
    public static void main(String[] args) throws Exception {
        // todo 1)构建flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // todo 2)设置并行度
        env.setParallelism(1);

        // todo 3)构建flink的表运行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);

        //todo 4)创建表的sql语句
        String filePath = EventTimeTableDDL.class.getClassLoader().getResource("order.csv").getPath();
        String sqlDDL = "CREATE TABLE InputTable (\n" +
                "  `id` varchar,\n" +
                "  `timestamp` bigint,\n" +
                "  `money` double,\n" +
                "  `category` varchar,\n" +
                "  `rt` AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),\n" +
                "  watermark for rt as rt - interval '1' second\n" +
                ") WITH (\n" +
                "  'connector' = 'filesystem',\n" +
                "  'path' = 'file:///"+filePath+"',\n" +
                "  'format' = 'csv'\n" +
                ")";

        // todo 5)执行表的sql语句
        tabEnv.executeSql(sqlDDL);

        // todo 6)打印表的结构信息
        Table table = tabEnv.sqlQuery("select * from InputTable");
        table.printSchema();

        // todo 7) 运行启动
        tabEnv.toAppendStream(table, Row.class).print();
        env.execute();
    }
}

总结:

  • 1- 建表时加上:rt AS TO_TIMESTAMP(FROM_UNIXTIME(timestamp))
  • 2- 记得加上水印:watermark for rt as rt - interval ‘1’ second

1.2.2 DataStream 转为 Table 时指定

例子:文件流转化为表

package cn.itcast.day02.time;

/**
 * @author lql
 * @time 2024-03-15 12:42:47
 * @description TODO
 */

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 基于事件时间使用时间属性
 * 将dataStream转换成表的时候指定事件时间
 */
public class EventTimeDataStream {
    public static void main(String[] args) throws Exception {
        // todo 1)构建flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // todo 2)设置并行度
        env.setParallelism(1);

        // todo 3)构建flink的表运行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);

        String path = EventTimeDataStream.class.getClassLoader().getResource("order.csv").getPath();
        DataStreamSource<String> inputDataStream = env.readTextFile(path);

        // todo 4)将读取到的字符串转化为pojo
        SingleOutputStreamOperator<OrderBean> orderDataStream = inputDataStream.map(new MapFunction<String, OrderBean>() {
            @Override
            public OrderBean map(String value) throws Exception {
                String[] dataArray = value.split(",");
                return new OrderBean(dataArray[0], Long.parseLong(dataArray[1]), Double.parseDouble(dataArray[2]), dataArray[3]);
            }
        });

        /**
         * assignTimestampsAndWatermarks() 方法将水印生成器和时间戳分配器应用于数据流。
         * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) 为有界无序的水印生成策略,设置最大允许的乱序时间为零。
         * withTimestampAssigner(new MyTimeAssiger()) 将时间戳分配器设置为 MyTimeAssiger 类的实例,用于从数据中提取时间戳。
         */
        SingleOutputStreamOperator<OrderBean> waterMarkStream = orderDataStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderBean>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new MyTimeAssiger())
        );


        // todo 5)将dataStream转化为表对象
        //替换现有字段
        Table table = tabEnv.fromDataStream(waterMarkStream, $("id"),
                $("money"), $("category"), $("timestamp").rowtime());

        //作为新字段追加到schema
        Table table2 = tabEnv.fromDataStream(waterMarkStream, $("id"),$("timestamp"),
                $("money"), $("category"), $("rt").rowtime());

        //todo 7)将表转换成table对象
        tabEnv.toAppendStream(table, Row.class).print();
        tabEnv.toAppendStream(table2, Row.class).print();

        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class OrderBean{
        private String id;
        private Long timestamp;
        private Double money;
        private String category;
    }

    /**
     * 自定义指定事件时间字段
     */
    private static class MyTimeAssiger implements SerializableTimestampAssigner<OrderBean> {
        @Override
        public long extractTimestamp(OrderBean orderBean, long l) {
            return orderBean.getTimestamp() * 1000L;
        }
    }
}

总结:

  • 1- 作为新字段追加到 schema:$(“timestamp”).rowtime()
  • 2- 作为新字段追加到 schema:$(“rt”).rowtime()
  • 3- 事件时间流要使用水印操作!

2. 时区特性

2.1 TimeStamp 和 TimeStamp_LTZ 区别
  • TIMESTAMP:在Flink中,TIMESTAMP类型相当于一个字符串类型。无论作业的时区如何变化,得到的字符串都是不变的
  • TIMESTAMP_LTZ:全球统一的时间点类型,其底层实现是Bigint类型。当将其转换为字符串时,结果会根据作业时区改变。

案例演示:

# 创建一个视图,两个时间:TO_TIMESTAMP_LTZ(4001, 3),TIMESTAMP '1970-01-01 00:00:01.001' 
Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001'  AS ntz;

# 描述视图结构
Flink SQL> DESC MyView2;

情况一:无明显变化

# 设置为格林尼治时间
Flink SQL> SET table.local-time-zone=UTC;
Flink SQL> SELECT * FROM MyView2;

情况二:设置上海时间,LTZ 时间动态变化,NTZ 时间不变

Flink SQL> SET table.local-time-zone=Asia/Shanghai;
Flink SQL> SELECT * FROM MyView2;

2.2 代码中设置时间
EnvironmentSettings envSetting = EnvironmentSettings.newInstance().build();
TableEnvironment tEnv = TableEnvironment.create(envSetting);

// 设置为 UTC 时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));

// 设置为上海时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

// 设置为 Los_Angeles 时区
tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));

2.3 时间属性和时区
2.3.1 时间新特性
  • 意思是,proctime()不用设置上海时间也能返回,但是为了保险起见,可以设置想要的时间!

2.3.2 Socket 数据源案例

需求:创建表从 Socket 获取数据,分别设置 UTC 时区 和 Asia/Shanghai 时区查看时间字段的变化

需要 jar 包:ChangelogSocketExample.jar(我的下载区准备好了哦,需要的可以自行下载!)

# 创建 socket 数据表
Flink SQL> CREATE TABLE MyTable1 (
                  item STRING,
                  price DOUBLE,
                  proctime as PROCTIME()
            ) WITH (
                'connector' = 'socket',
                'hostname' = 'node1',
                'port' = '9999',
                'format' = 'csv'
           );

3. 时态表

3.1 版本表和普通表
  • 版本表:能够记录访问历史版本,来自数据库的 changelog 可以定义为版本表!
  • 普通表:只能记录访问最新版本,HBase 的表可以定义为普通表!

3.2 时态表函数

时态表函数和时态表 DDL 最大的区别在于:

3.2.1 批的方式时态表

例子:订单表跟着汇率表变化!

package cn.itcast.day02.Temproal;

/**
 * @author lql
 * @time 2024-03-16 12:39:11
 * @description TODO
 */
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.table.api.Expressions.*;

/**
 * 使用时态表函数计算订单金额(批的方式实现)
 * 需要两个流数据:
 * 1)订单流
 * 2)汇率流
 * 其中汇率流的数据使用时态表函数进行关联
 */
public class TemporalTablesFunctionBatch {
    public static void main(String[] args) throws Exception {
        // Todo 1) 构建表环境
        // 1.1 构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1.2 构建 settings 环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        // 1.3 构建表环境
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);

        // Todo 2) 构建数据源
        // 2.1 订单流
        List<Tuple3<Double, String, Long>> orderList = new ArrayList<>();
        orderList.add(new Tuple3<>(7D, "Euro", 2L));    //欧元
        orderList.add(new Tuple3<>(7D, "US Dollar", 3L)); //美元
        orderList.add(new Tuple3<>(0.05D, "Yen", 4L)); //人民币
        orderList.add(new Tuple3<>(8D, "Euro", 5L));//欧元

        SingleOutputStreamOperator<Tuple3<Double, String, Long>> orderStream = env.fromCollection(orderList)
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<Double, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Double, String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple3<Double, String, Long> element, long l) {
                                return element.f2 * 1000L;
                            }
                        }));

        // 2.2 汇率流
        List<Tuple3<String, Integer, Long>> rateList = new ArrayList<>();
        rateList.add(new Tuple3<>("US Dollar", 102, 1L));
        rateList.add(new Tuple3<>("Euro", 114, 1L));
        rateList.add(new Tuple3<>("Yen", 1, 1L));
        rateList.add(new Tuple3<>("Euro", 116, 5L));
        rateList.add(new Tuple3<>("Euro", 117, 7L));

        SingleOutputStreamOperator<Tuple3<String, Integer, Long>> rateStream = env.fromCollection(rateList)
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, Long> element, long l) {
                                return element.f2 * 1000L;
                            }
                        }));

        // Todo 3) 将两个流转化为表,记得指定事件时间
        Table orderTable = tabEnv.fromDataStream(orderStream, $("amount"), $("currency"), $("rowtime").rowtime());
        Table rateTable = tabEnv.fromDataStream(rateStream, $("currency"), $("rate"), $("rowtime").rowtime());

        // Todo 4) 将表对象注册成视图/表
        tabEnv.createTemporaryView("Orders", orderTable);
        tabEnv.createTemporaryView("RatesHistory", rateTable);

        // Todo 5) 将汇率表定义为时态函数:指定事件时间,动态的字段
        // 先定义时态函数,再注册到表环境中
        TemporalTableFunction temporalTableFunction = rateTable.createTemporalTableFunction($("rowtime"), $("currency"));
        tabEnv.createTemporaryFunction("Rates",temporalTableFunction);

        // Todo 6) 关联查询
        Table result = tabEnv.sqlQuery(
                " SELECT o.currency, o.amount, r.rate, \n" +
                  "  o.amount * r.rate AS yen_amount \n" +
                  "  FROM \n" +
                  "  Orders AS o, \n" +
                  "  LATERAL TABLE (Rates(o.rowtime)) AS r \n" +
                  "  WHERE r.currency = o.currency");

        //todo 7)查询打印
        tabEnv.toAppendStream(result, Row.class).printToErr();
        env.execute();
    }
}

结果:

+I[US Dollar, 7.0, 102, 714.0]
+I[Yen, 0.05, 1, 0.05]
+I[Euro, 7.0, 114, 798.0]
+I[Euro, 8.0, 116, 928.0]

总结:

  • 1- 两个流需要指定水印
  • 2- 流转化为表对象的时候,需要指定事件时间
  • 3- 将变化的表定义为时态函数,再注册到表环境中
  • 4- 关联查询,时态表函数中,变化的表要用 LATERAL TABLE

3.2.2 流的方式时态表

例子:从Kafka消费事件流(browse_event)和商品流(product_history_info)数据,并根据事件流中的商品id关联商品流的数据。

事件流:

{"userID": "user_001", "eventTime": "2021-01-01 00:00:00", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:01", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:02", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:03", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:04", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:05", "eventType": "browse", "productID": "product_005"}
{"userID": "user_001", "eventTime": "2021-01-01 00:00:06", "eventType": "browse", "productID": "product_005"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:01", "eventType": "browse", "productID": "product_003"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:02", "eventType": "browse", "productID": "product_003"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:05", "eventType": "browse", "productID": "product_003"}
{"userID": "user_002", "eventTime": "2021-01-01 00:00:06", "eventType": "browse", "productID": "product_003"}

商品流:

{"productID":"product_005","productName":"苹果电脑","productCategory":"电脑","updatedAt":"2021-01-01 00:00:00", "productPrice": 20}
{"productID":"product_005","productName":"苹果电脑","productCategory":"电脑","updatedAt":"2021-01-01 00:00:02", "productPrice": 30}
{"productID":"product_005","productName":"苹果电脑","productCategory":"电脑","updatedAt":"2021-01-01 00:00:05", "productPrice": 40}
{"productID":"product_003","productName":"华为手机","productCategory":"手机","updatedAt":"2021-01-01 00:00:02", "productPrice": 20}
{"productID":"product_003","productName":"华为手机","productCategory":"手机","updatedAt":"2021-01-01 00:00:05", "productPrice": 30}

代码:解析两个 json 数据源,两张表的拉宽操作

package cn.itcast.day02.Temproal;

/**
 * @author lql
 * @time 2024-03-16 13:50:43
 * @description TODO
 */

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;


import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import static org.apache.flink.table.api.Expressions.*;

/**
 * 使用时态表函数对访问的商品信息进行拉宽操作(流的方式实现)
 * 需要两个流数据:
 * 1)商品访问事件流
 * 2)商品基础信息流
 * 商品访问事件流中的商品id与商品基础信息流的数据进行拉宽操作使用时态表函数进行关联
 */
public class TemporalTablesFunctionStreaming {
    public static void main(String[] args) throws Exception {
        // Todo 1) 构建表环境
        // 1.1 构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1.2 构建 settings 环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        // 1.3 构建表环境
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);

        // Todo 2) 构建数据源
        //定义访问的kafka集群地址
        String kafkaBootstrapServers = "node1:9092";
        //定义访问事件流的topic
        String browseTopic = "browseTopic2";
        //定义商品基础信息流的topic
        String productInfoTopic = "productHistoryInfoTopic2";
        //定义访问事件流的消费者组id
        String browseTopicGroupID = "browseTopicGroupID_002";
        //定义访问商品基础信息流的消费者组id
        String productInfoTopicGroupID = "productInfoTopicGroupID_002";

        // 2.1 构建访问事件流的数据源
        //注意: 为了在北京时间和时间戳之间有直观的认识,这里的UserBrowseLog中增加了一个字段eventTimeTimestamp作为eventTime的时间戳
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers", kafkaBootstrapServers);
        browseProperties.put("group.id", browseTopicGroupID);
        browseProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        browseProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        DataStreamSource<String> browseStream = env.addSource(new FlinkKafkaConsumer<>(browseTopic, new SimpleStringSchema(), browseProperties));

        browseStream.print("事件流原始数据>>>");
        SingleOutputStreamOperator<UserBrowseLog> browseWatermarkStream = browseStream.process(new BrowseKafkaProcessFunction())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserBrowseLog>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserBrowseLog>() {
                            @Override
                            public long extractTimestamp(UserBrowseLog element, long l) {
                                // 这里不用转化为毫秒时间戳,因为json转化为java bean的时候已经指定过了
                                return element.getEventTimeTimestamp();
                            }
                        }));

        browseWatermarkStream.print("事件流水印数据>>>");

        // 2.1 构建商品信息流的数据源
        Properties productInfoProperties = new Properties();
        productInfoProperties.put("bootstrap.servers", kafkaBootstrapServers);
        productInfoProperties.put("group.id", productInfoTopicGroupID);
        productInfoProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        productInfoProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        DataStreamSource<String> productInfoStream = env.addSource(new FlinkKafkaConsumer<>(productInfoTopic, new SimpleStringSchema(), productInfoProperties));

        SingleOutputStreamOperator<ProductInfo> productInfoWatermarkStream = productInfoStream.process(new ProductInfoProcessFunction())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ProductInfo>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<ProductInfo>() {
                            @Override
                            public long extractTimestamp(ProductInfo element, long l) {
                                return element.getUpdatedAtTimestamp();
                            }
                        }));

        productInfoWatermarkStream.printToErr("商品流水印数据>>>");

        // Todo 3) 将流转化为表
        Table table_brow = tabEnv.fromDataStream(browseStream, $("userID"), $("eventTime"),
                $("eventType"), $("productID"), $("eventTimeTimestamp"), $("browseRowtime").rowtime());

        Table table_product = tabEnv.fromDataStream(productInfoStream, $("productID"), $("productName"),
                $("productCategory"), $("updatedAt"), $("updatedAtTimestamp"), $("productPrice"), $("productInfoRowtime").rowtime());

        // Todo 4) 将表注册成视图
        tabEnv.createTemporaryView("browse",table_brow);
        tabEnv.createTemporaryView("productInfo",table_product);

        //todo 6)使用sql的方式连接两张表
        TemporalTableFunction productInfoFunction = tabEnv.scan("productInfo").createTemporalTableFunction($("productInfoRowtime"), $("productID"));

        tabEnv.createTemporaryFunction("productInfoFunc", productInfoFunction);

        String sql = ""
                + "SELECT "
                + "browse.userID, "
                + "browse.eventTime, "
                + "browse.eventTimeTimestamp, "
                + "browse.eventType, "
                + "browse.productID, "
                + "productInfo.productID, "
                + "productInfo.productName, "
                + "productInfo.productCategory, "
                + "productInfo.productPrice, "
                + "productInfo.updatedAt, "
                + "productInfo.updatedAtTimestamp "
                + "FROM "
                + " browse, "
                + " LATERAL TABLE (productInfoFunc(browse.browseRowtime)) as productInfo "
                + "WHERE "
                + " browse.productID=productInfo.productID";

        //todo 7)执行sql查询操作
        Table table = tabEnv.sqlQuery(sql);
        tabEnv.toAppendStream(table, Row.class).print();

        //执行
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserBrowseLog implements Serializable {
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Long eventTimeTimestamp;
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class ProductInfo implements Serializable {
        //产品id
        private String productID;
        //产品名称
        private String productName;
        //产品类型
        private String productCategory;
        //更新时间
        private String updatedAt;
        //更新时间戳
        private Long updatedAtTimestamp;
        private double productPrice;
    }

    private static class BrowseKafkaProcessFunction extends ProcessFunction<String,UserBrowseLog> {
        @Override
        public void processElement(String value, Context context, Collector<UserBrowseLog> collector) throws Exception {
            UserBrowseLog log = JSON.parseObject(value,UserBrowseLog.class);
            //增加一个long类型的时间戳
            DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+8:00"));
            //将事件时间转换成毫秒的时间戳返回
            long eventTimestamp = eventTime.toInstant().toEpochMilli();
            log.setEventTimeTimestamp(eventTimestamp);
            collector.collect(log);
        }
    }

    private static class ProductInfoProcessFunction extends ProcessFunction<String,ProductInfo>{
        @Override
        public void processElement(String value, Context context, Collector<ProductInfo> collector) throws Exception {
            ProductInfo log = JSON.parseObject(value, ProductInfo.class);
            //增加一个long类型的时间戳
            DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            OffsetDateTime eventTime = LocalDateTime.parse(log.getUpdatedAt(), format).atOffset(ZoneOffset.of("+8:00"));
            //将事件时间转换成毫秒的时间戳返回
            long eventTimestamp = eventTime.toInstant().toEpochMilli();
            log.setUpdatedAtTimestamp(eventTimestamp);
            collector.collect(log);
        }
    }
}

总结:将事件时间转化为毫秒级时间戳,添加字段,记得加上 8 小时


3.3时态表 Join VS 双流 Join

都可以管理 State;时态表 JOIN是单边驱动,是被动的查询;而双流JOIN是双边驱动,两边都是主动的进行JOIN计算。


3.3.1 基于处理时间 | 事件时间的时态 Join

语法:

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1

3.3.2 案例演示

rateOrder 数据:

1,29,RMB,2021-01-02 00:00:00
2,19,RMB,2021-01-03 00:00:00
3,33,RMB,2021-01-11 00:00:00
4,55,RMB,2021-01-21 00:00:00

rateHistory 数据:

RMB,114,2021-01-01 00:00:00
RMB,115,2021-01-03 00:00:00
RMB,116,2021-01-19 00:00:00
Euro,119,2021-01-03 00:00:00
USD,99,2021-01-03 00:00:00
USD,100,2021-01-03 00:00:00
Euro,118,2021-01-03 00:00:00

代码:订单表和汇率表,将汇率表设置成时态表,用户根据订单表中的下单时间 Join 下单时的汇率表当时最新的维度数据

package cn.itcast.day02.Temproal;

/**
 * @author lql
 * @time 2024-03-16 15:32:44
 * @description TODO
 */

import cn.itcast.day01.example.DataStreamToTable;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 需求描述
 * 订单表和汇率表,将汇率表设置成时态表,这样用户就可以根据订单表中的下单时间Join下单时的汇率表当时最新的维度数据
 */
public class TemporalTableJoinEventTime {
    public static void main(String[] args) throws Exception {
        // Todo 1) 构建表环境
        // 1.1 构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1.2 构建 settings 环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        // 1.3 构建表环境
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bsSettings);

        //todo 2)加载数据
        String rateOrderPath = TemporalTableJoinEventTime.class.getClassLoader().getResource("rateOrder.csv").getPath();
        String rateHistoryPath = TemporalTableJoinEventTime.class.getClassLoader().getResource("rateHistory.csv").getPath();

        String sqlDDL =  "create table rateOrder (" +
                "                  order_id String," +
                "                  `price` DECIMAL(32,2)," +
                "                  currency String," +
                "                  order_time TIMESTAMP(3)," +
                "                  WATERMARK FOR order_time as order_time" +
                "                  ) with (" +
                "                  'connector' = 'filesystem'," +
                "                  'path' = 'file:///"+rateOrderPath+"'," +
                "                  'format' = 'csv'" +
                "                  )";

        tabEnv.executeSql(sqlDDL);

        sqlDDL =  "create table rateHistory (" +
                "                  currency String," +
                "                  `conversion_rate` DECIMAL(32,2)," +
                "                  update_time TIMESTAMP(3)," +
                "                  PRIMARY KEY (currency) NOT ENFORCED," +
                "                  WATERMARK FOR update_time as update_time" +
                "                  ) with (" +
                "                  'connector' = 'filesystem'," +
                "                  'path' = 'file:///"+rateHistoryPath+"'," +
                "                  'format' = 'csv'" +
                "                  )";

        tabEnv.executeSql(sqlDDL);

        String sql = "select order_id," +
                "   price," +
                "   rateOrder.currency," +
                "   conversion_rate," +
                "   order_time" +
                " from rateOrder" +
                "   left join rateHistory for system_time as of rateOrder.order_time" +
                "   on rateOrder.currency=rateHistory.currency";

        Table result = tabEnv.sqlQuery(sql);
        tabEnv.toAppendStream(result, Row.class).print();

        env.execute();
    }
}

结果:

+I[1, 29.00, RMB, 114.00, 2021-01-02T00:00]
+I[2, 19.00, RMB, 115.00, 2021-01-03T00:00]
+I[3, 33.00, RMB, 115.00, 2021-01-11T00:00]
+I[4, 55.00, RMB, 116.00, 2021-01-21T00:00]

总结:

  • 1- 两个表都需要设置水印时间
  • 2- 时态表需要定义主键约束和事件时间
  • 3- 在关联查询时,任意表去关联时态表,使用 system_time!

3.4 Lookup Join

Lookup join通常用于使用从外部系统查询的数据来丰富表。连接要求一个表具有处理时间属性,另一个表由查找源连接器支持。

实例:右表是 MYSQL 数据源

CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb?characterEncoding=utf-8&useSSL=false',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

举报

相关推荐

0 条评论