文章目录
简介
本文演示了 Flink CDC 读取 MySQL 数据,并通过 Flink SQL 写入到 Iceberg 的过程。包含了 Insert/update/delete 的操作。
系统环境
Ubuntu 20.04
JDK 1.8
Maven 3.6.3
Flink 1.13.6
Iceberg 0.13.0
MySQL 测试数据准备
mysql> CREATE DATABASE mydb;
mysql> USE mydb;
mysql> CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter");
Query OK, 1 row affected (0.01 sec)
Apache Iceberg 源码编译
参考文章《Apache Iceberg 源码编译 iceberg-flink-runtime Jar 文件》
编译产生的 iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar 文件在后面的 Flink 集群准备
需要用到。
Flink CDC 源码编译
参考文章《Flink CDC 系列(2)—— Flink CDC 源码编译》
编译产生的 Jar 文件在后面的 Flink 集群准备
需要用到。
Flink 集群准备
1. 下载 flink 1.13.6 的二进制安装包
axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz
2. 解压
tar xvf flink-1.13.6-bin-scala_2.11.tgz
3. 将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到
cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib
4. 将 iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由Apache Iceberg源码编译得到
cp /opt/iceberg/flink/v1.13/flink-runtime/build/libs/iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar /opt/flink-1.13.6/lib
5. 修改 /opt/flink-1.13.6/conf/workers
vi /opt/flink-1.13.6/conf/workers
workers文件内容:
localhost
localhost
localhost
意思是要在本机启动三个work进程
6. 修改 /opt/flink-1.13.6/conf/flink-conf.yaml
vi /opt/flink-1.13.6/conf/flink-conf.yaml
设置参数: taskmanager.numberOfTaskSlots: 2
7. 下载 flink hadoop uber jar 文件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 文件拷贝到 /opt/flink-1.13.6/lib 目录下
8. 启动单机集群
cd /opt/flink-1.13.6
bin/start-cluster.sh
9. 查看 jobmanager 和 taskmanager 的进程是否存活
$ jps -m
9824 Jps -m
9143 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
8875 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
9403 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
9727 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
ubuntu@ubuntu:/opt/flink-1.13.6$
演示开始
建议:打开两个命令行窗口,一个用来执行Mysql命令,一个用来执行Flink SQL 命令
1. 启动 Flink SQL Client
cd /opt/flink-1.13.6
bin/sql-client.sh
2. 在 Flink SQL Client 中执行 DDL 和 查询
Flink SQL> SET execution.checkpointing.interval = 3s;
-- 创建 mysql-cdc source
Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.64.6',
'port' = '3306',
'username' = 'test',
'password' = 'test',
'database-name' = 'mydb',
'table-name' = 'products'
);
[INFO] Execute statement succeed.
Flink SQL> select * from products;
id name description
1 scooter1 Small 1-wheel scooter
-- 创建 iceberg sink
-- iceberg数据存储在本地目录文件file:///opt/data/warehouse
-- 有条件的小伙伴可以使用其他文件系统,如HDFS
Flink SQL> CREATE TABLE products_sink (
id int,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='iceberg',
'catalog-name'='iceberg_catalog',
'catalog-type'='hadoop',
'warehouse'='file:///opt/data/warehouse',
'format-version'='2'
);
INFO] Execute statement succeed.
-- mysql cdc source表的数据写入iceberg
Flink SQL> insert into products_sink select * from products;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5a469a7ebda2383a831b63a3aed2b2f3
-- 查看iceberg表的数据
Flink SQL> select * from products_sink;
id name description
1 scooter1 Small 1-wheel scooter
3. 在Mysql客户端插入新的数据
mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");
4. 在Flink SQL Client 执行查询
Flink SQL> select * from products_sink;
id name description
1 scooter1 Small 1-wheel scooter
2 scooter2 Small 2-wheel scooter
-- 新数据写到了iceberg
5. 在Mysql客户端更新数据
mysql> update products set name = 'scooter----1' where id = 1;
6. 在Flink SQL Client 执行查询
Flink SQL> select * from products_sink;
id name description
1 scooter----1 Small 1-wheel scooter
2 scooter2 Small 2-wheel scooter
-- id=1的数据被更新到了iceberg
7. 在Mysql客户端删除的数据
mysql> delete from products where id = 1;
8. 在Flink SQL Client 执行查询
Flink SQL> select * from products_sink;
id name
2 scooter2 Small 2-wheel scooter
-- id=1的数据被删除
9. iceberg 数据存储目录结构
$ cd /opt/data/warehouse
$ tree
.
└── default_database
└── products_sink
├── data
│ ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00003.parquet
│ ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00143.parquet
│ ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00167.parquet
│ ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00168.parquet
│ ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00315.parquet
│ └── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00346.parquet
└── metadata
├── 065b8543-4ceb-4ac5-b1c1-c32c8ed5a6f1-m0.avro
├── 0e334a80-5af4-4e26-84f9-aba1590b226f-m0.avro
├── 82b0a129-1275-4ea9-9c1b-8b594dc5b209-m0.avro
├── a2229b60-b099-4758-bbfe-ecce4c6bbbe0-m0.avro
├── cf767c3e-1dd5-4594-853c-4523c4035c62-m0.avro
├── cf767c3e-1dd5-4594-853c-4523c4035c62-m1.avro
├── snap-1038129902406854120-1-d13d6f1b-4b6d-442c-b890-fcb1e5c8ac5b.avro
├── snap-1704576157153627498-1-8b0968ad-13b6-403f-9227-4db5b7049f14.avro
├── snap-178148493310505832-1-a2229b60-b099-4758-bbfe-ecce4c6bbbe0.avro
├── snap-1806340930748900084-1-dc0b7031-ff14-4dbb-8f2b-10b5150946c1.avro
├── snap-1821270176101527479-1-40e4cda2-2045-474f-8000-fee08d0b0b89.avro
├── snap-2084053056680857129-1-6f24cdc8-e92b-4ac1-a261-25443d9a61ad.avro
├── snap-2243697033954591765-1-05513fc3-c627-4b2d-8482-cc431f5d3092.avro
├── snap-2588751793768843907-1-1cacca71-5248-4f6a-8140-2c45bb6399df.avro
├── snap-2840275162584951513-1-7195716d-fec4-429b-8b8b-9adbf14c3290.avro
├── snap-3186443929475318225-1-f4161850-bd17-4403-b8ff-83ba33724c4c.avro
├── snap-3224849802565407046-1-10af2534-34ba-472c-b44c-6b246be153b9.avro
├── snap-3807080408249452424-1-28f9c973-554e-40d8-aff8-232e926158e8.avro
├── snap-4372125761906099061-1-065b8543-4ceb-4ac5-b1c1-c32c8ed5a6f1.avro
├── snap-4651567489753397852-1-83245c09-f56d-4e22-bfbb-71737b948e64.avro
├── snap-5069479447920727265-1-b6814152-06ed-43be-a032-b0f5cae5da07.avro
├── snap-5490176109434915189-1-9683fe20-6c7a-48e6-b24b-13c629d3d9e8.avro
├── snap-5491693674050678453-1-dbb1f238-7bd2-4fe7-b4cc-231361bbf31d.avro
├── snap-6172797983803291817-1-56e2a11c-da88-4dd3-b2a0-d572a7a64884.avro
├── snap-6245440190107425990-1-0e334a80-5af4-4e26-84f9-aba1590b226f.avro
├── snap-6461640912281765723-1-36bc810a-1fcc-49c8-b135-57200d0969c5.avro
├── snap-6507744750658002810-1-cf767c3e-1dd5-4594-853c-4523c4035c62.avro
├── snap-6546253251216199629-1-82b0a129-1275-4ea9-9c1b-8b594dc5b209.avro
├── snap-6718533968457740716-1-54d39158-12f2-4422-a27f-27a208318245.avro
├── snap-8316323063598850681-1-d0face11-47ed-4775-880b-79b8be4ee5c5.avro
├── snap-83259785047788845-1-87d59a26-bf8d-4e9f-9dee-99c86bb7e873.avro
├── snap-8999685882947129639-1-4628d9d8-2d0d-4646-ad0b-064d4e9852fc.avro
├── v1.metadata.json
├── v10.metadata.json
├── v11.metadata.json
├── v12.metadata.json
├── v13.metadata.json
├── v14.metadata.json
├── v15.metadata.json
├── v16.metadata.json
├── v17.metadata.json
├── v18.metadata.json
├── v19.metadata.json
├── v2.metadata.json
├── v20.metadata.json
├── v21.metadata.json
├── v22.metadata.json
├── v23.metadata.json
├── v24.metadata.json
├── v25.metadata.json
├── v26.metadata.json
├── v27.metadata.json
├── v3.metadata.json
├── v4.metadata.json
├── v5.metadata.json
├── v6.metadata.json
├── v7.metadata.json
├── v8.metadata.json
├── v9.metadata.json
└── version-hint.text
4 directories, 66 files