0
点赞
收藏
分享

微信扫一扫

Flink CDC 系列(9)—— MySQL 数据入湖 Iceberg,Flink 流式读取 Iceberg

陬者 2022-03-24 阅读 109

文章目录

简介

上一篇 《Flink CDC 系列(7)—— MySQL 数据入湖 Iceberg》介绍了Flink CDC 读取MySQL数据实时写到 Iceberg,Flink SQL 以Batch的方式读取Iceberg的数据。
与上一篇不同,本篇要介绍的是 Flink SQL 以 Streaming 的方式读取Iceberg的增量数据。

系统环境

Ubuntu 20.04
JDK 1.8
Maven 3.6.3
Flink 1.13.6
Flink CDC 2.2
Iceberg 0.13.0

MySQL 测试数据准备


mysql> CREATE DATABASE mydb;

mysql> USE mydb;

mysql> CREATE TABLE products (
       id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
       name VARCHAR(255) NOT NULL,
       description VARCHAR(512)
     );

mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter");
Query OK, 1 row affected (0.01 sec)

Apache Iceberg 源码编译

参考文章《Apache Iceberg 源码编译 iceberg-flink-runtime Jar 文件》
编译产生的 iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar 文件在后面的 Flink 集群准备
需要用到。

Flink CDC 源码编译

参考文章《Flink CDC 系列(2)—— Flink CDC 源码编译》
编译产生的 Jar 文件在后面的 Flink 集群准备
需要用到。

Flink 集群准备

1. 下载 flink 1.13.6 的二进制安装包

axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz

2. 解压

tar xvf flink-1.13.6-bin-scala_2.11.tgz

3. 将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到

cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib

4. 将 iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由Apache Iceberg源码编译得到

cp /opt/iceberg/flink/v1.13/flink-runtime/build/libs/iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar /opt/flink-1.13.6/lib

5. 修改 /opt/flink-1.13.6/conf/workers

vi /opt/flink-1.13.6/conf/workers

workers文件内容:

localhost
localhost
localhost

意思是要在本机启动三个work进程

6. 修改 /opt/flink-1.13.6/conf/flink-conf.yaml

vi  /opt/flink-1.13.6/conf/flink-conf.yaml

设置参数: taskmanager.numberOfTaskSlots: 2

7. 下载 flink hadoop uber jar 文件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 文件拷贝到 /opt/flink-1.13.6/lib 目录下

在这里插入图片描述
8. 启动单机集群

cd /opt/flink-1.13.6
bin/start-cluster.sh

9. 查看 jobmanager 和 taskmanager 的进程是否存活

$ jps -m
9824 Jps -m
9143 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
8875 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
9403 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
9727 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
ubuntu@ubuntu:/opt/flink-1.13.6$

演示开始

建议:打开两个命令行窗口,一个用来执行Mysql命令,一个用来执行Flink SQL 命令

1. 启动 Flink SQL Client

cd /opt/flink-1.13.6
bin/sql-client.sh

2. 在 Flink SQL Client 中执行 DDL 和 查询

Flink SQL> SET execution.checkpointing.interval = 3s;
-- 创建 mysql-cdc source 
Flink SQL> CREATE TABLE products (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.64.6',
     'port' = '3306',
     'username' = 'test',
     'password' = 'test',
     'database-name' = 'mydb',
     'table-name' = 'products'
   );
[INFO] Execute statement succeed.
Flink SQL> select * from products;
id                 name                 description
1                 scooter1          Small 1-wheel scooter

-- 创建 iceberg sink
-- iceberg数据存储在本地目录文件file:///opt/data/warehouse
-- 有条件的小伙伴可以使用其他文件系统,如HDFS
Flink SQL>	CREATE TABLE products_sink (
	    id int,
	    name    STRING,
	    description         STRING,
	    PRIMARY KEY (id) NOT ENFORCED
	  ) WITH (
	    'connector'='iceberg',
	    'catalog-name'='iceberg_catalog',
	    'catalog-type'='hadoop',  
	    'warehouse'='file:///opt/data/warehouse',
	    'format-version'='2'
	  );
INFO] Execute statement succeed.

-- mysql cdc source表的数据写入iceberg
Flink SQL> insert into products_sink select * from products;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5a469a7ebda2383a831b63a3aed2b2f3

-- 指定streaming方式读取数据
Flink SQL> SET execution.type = streaming ;
-- 允许dynamic table 的参数设置
Flink SQL> SET table.dynamic-table-options.enabled=true;

-- 查看iceberg表的数据
-- 设置两个参数:'streaming'='true', 'monitor-interval'='1s'
Flink SQL> SELECT * FROM products_sink /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
id                 name                 description
1                 scooter1          Small 1-wheel scooter
-- -- 这一步相当于在后台提交了一个Flink流式计算任务,当有新数据到来的时候,这里也会实时的刷新,为了方便观测,请勿退出该SQL或者关闭窗口

3. 在Mysql客户端插入新的数据

mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");

**4. 观察Flink SQL Client窗口的数据 **

id                 name                 description
1                 scooter1         Small 1-wheel scooter
2                 scooter2         Small 2-wheel scooter
-- Flink SQL 实时地查询到了iceberg的增量数据

5. 在Mysql客户端更新的数据

mysql> update products set name = 'scooter----1' where id = 1;

6. 观察Flink SQL Client窗口的数据

id                 name                 description
1                 scooter1         Small 1-wheel scooter
2                 scooter2         Small 2-wheel scooter
-- 数据没有变化

在日志有以下报错:

2022-03-16 22:23:05
java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (4131624424196760095, 7384798584482761312]
	at org.apache.iceberg.IncrementalDataTableScan.snapshotsWithin(IncrementalDataTableScan.java:121)
	at org.apache.iceberg.IncrementalDataTableScan.planFiles(IncrementalDataTableScan.java:72)
	at org.apache.iceberg.BaseTableScan.planTasks(BaseTableScan.java:210)
	at org.apache.iceberg.DataTableScan.planTasks(DataTableScan.java:29)
	at org.apache.iceberg.flink.source.FlinkSplitGenerator.tasks(FlinkSplitGenerator.java:99)
	at org.apache.iceberg.flink.source.FlinkSplitGenerator.createInputSplits(FlinkSplitGenerator.java:41)
	at org.apache.iceberg.flink.source.StreamingMonitorFunction.monitorAndForwardSplits(StreamingMonitorFunction.java:143)
	at org.apache.iceberg.flink.source.StreamingMonitorFunction.run(StreamingMonitorFunction.java:121)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

由此可见,Flink还不能以 Streaming 的方式读取 Iceberg 的增量update/delete数据。

总结

Flink 读取 Iceberg 数据有两种方式:

  • batch , 默认方式,启动一个 Flink 批处理作业来读取数据, 支持读取insert/update/delete 类型的操作数据。
  • streaming, 启动一个 Flink 流处理作业来读取数据,可以实时(进实时)地读取insert类型的操作数据,当前版本不支持update/delete 类型的操作数据。
举报

相关推荐

0 条评论