0
点赞
收藏
分享

微信扫一扫

Flink CDC 系列(8)—— MySQL 数据入湖 Iceberg

夏沐沐 2022-03-23 阅读 107

文章目录

简介

本文演示了 Flink CDC 读取 MySQL 数据,并通过 Flink SQL 写入到 Iceberg 的过程。包含了 Insert/update/delete 的操作。

系统环境

Ubuntu 20.04
JDK 1.8
Maven 3.6.3
Flink 1.13.6
Iceberg 0.13.0

MySQL 测试数据准备


mysql> CREATE DATABASE mydb;

mysql> USE mydb;

mysql> CREATE TABLE products (
       id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
       name VARCHAR(255) NOT NULL,
       description VARCHAR(512)
     );

mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter");
Query OK, 1 row affected (0.01 sec)

Apache Iceberg 源码编译

参考文章《Apache Iceberg 源码编译 iceberg-flink-runtime Jar 文件》
编译产生的 iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar 文件在后面的 Flink 集群准备
需要用到。

Flink CDC 源码编译

参考文章《Flink CDC 系列(2)—— Flink CDC 源码编译》
编译产生的 Jar 文件在后面的 Flink 集群准备
需要用到。

Flink 集群准备

1. 下载 flink 1.13.6 的二进制安装包

axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz

2. 解压

tar xvf flink-1.13.6-bin-scala_2.11.tgz

3. 将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到

cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib

4. 将 iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由Apache Iceberg源码编译得到

cp /opt/iceberg/flink/v1.13/flink-runtime/build/libs/iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar /opt/flink-1.13.6/lib

5. 修改 /opt/flink-1.13.6/conf/workers

vi /opt/flink-1.13.6/conf/workers

workers文件内容:

localhost
localhost
localhost

意思是要在本机启动三个work进程

6. 修改 /opt/flink-1.13.6/conf/flink-conf.yaml

vi  /opt/flink-1.13.6/conf/flink-conf.yaml

设置参数: taskmanager.numberOfTaskSlots: 2

7. 下载 flink hadoop uber jar 文件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 文件拷贝到 /opt/flink-1.13.6/lib 目录下

在这里插入图片描述

8. 启动单机集群

cd /opt/flink-1.13.6
bin/start-cluster.sh

9. 查看 jobmanager 和 taskmanager 的进程是否存活

$ jps -m
9824 Jps -m
9143 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
8875 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
9403 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
9727 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
ubuntu@ubuntu:/opt/flink-1.13.6$

演示开始

建议:打开两个命令行窗口,一个用来执行Mysql命令,一个用来执行Flink SQL 命令

1. 启动 Flink SQL Client

cd /opt/flink-1.13.6
bin/sql-client.sh

2. 在 Flink SQL Client 中执行 DDL 和 查询

Flink SQL> SET execution.checkpointing.interval = 3s;
-- 创建 mysql-cdc source 
Flink SQL> CREATE TABLE products (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.64.6',
     'port' = '3306',
     'username' = 'test',
     'password' = 'test',
     'database-name' = 'mydb',
     'table-name' = 'products'
   );
[INFO] Execute statement succeed.
Flink SQL> select * from products;
id                 name                 description
1                 scooter1          Small 1-wheel scooter

-- 创建 iceberg sink
-- iceberg数据存储在本地目录文件file:///opt/data/warehouse
-- 有条件的小伙伴可以使用其他文件系统,如HDFS
Flink SQL>	CREATE TABLE products_sink (
	    id int,
	    name    STRING,
	    description         STRING,
	    PRIMARY KEY (id) NOT ENFORCED
	  ) WITH (
	    'connector'='iceberg',
	    'catalog-name'='iceberg_catalog',
	    'catalog-type'='hadoop',  
	    'warehouse'='file:///opt/data/warehouse',
	    'format-version'='2'
	  );
INFO] Execute statement succeed.

-- mysql cdc source表的数据写入iceberg
Flink SQL> insert into products_sink select * from products;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5a469a7ebda2383a831b63a3aed2b2f3

-- 查看iceberg表的数据
Flink SQL> select * from products_sink;
id                 name                 description
1                 scooter1          Small 1-wheel scooter


3. 在Mysql客户端插入新的数据

mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");

4. 在Flink SQL Client 执行查询

Flink SQL> select * from products_sink;
id                 name                 description
1                 scooter1          Small 1-wheel scooter
2                 scooter2         Small 2-wheel scooter
-- 新数据写到了iceberg

5. 在Mysql客户端更新数据

mysql> update products set name = 'scooter----1' where id = 1;

6. 在Flink SQL Client 执行查询

Flink SQL> select * from products_sink;
id                           name                    description
1                   scooter----1          Small 1-wheel scooter
2                       scooter2          Small 2-wheel scooter

-- id=1的数据被更新到了iceberg

7. 在Mysql客户端删除的数据

mysql> delete from products where id  = 1;

8. 在Flink SQL Client 执行查询

Flink SQL> select * from products_sink;
id                           name                    
2                       scooter2          Small 2-wheel scooter

-- id=1的数据被删除

9. iceberg 数据存储目录结构

$ cd /opt/data/warehouse
$ tree
.
└── default_database
    └── products_sink
        ├── data
        │   ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00003.parquet
        │   ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00143.parquet
        │   ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00167.parquet
        │   ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00168.parquet
        │   ├── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00315.parquet
        │   └── 00000-0-9112ba9c-444f-4798-b1d8-17c9d79e4221-00346.parquet
        └── metadata
            ├── 065b8543-4ceb-4ac5-b1c1-c32c8ed5a6f1-m0.avro
            ├── 0e334a80-5af4-4e26-84f9-aba1590b226f-m0.avro
            ├── 82b0a129-1275-4ea9-9c1b-8b594dc5b209-m0.avro
            ├── a2229b60-b099-4758-bbfe-ecce4c6bbbe0-m0.avro
            ├── cf767c3e-1dd5-4594-853c-4523c4035c62-m0.avro
            ├── cf767c3e-1dd5-4594-853c-4523c4035c62-m1.avro
            ├── snap-1038129902406854120-1-d13d6f1b-4b6d-442c-b890-fcb1e5c8ac5b.avro
            ├── snap-1704576157153627498-1-8b0968ad-13b6-403f-9227-4db5b7049f14.avro
            ├── snap-178148493310505832-1-a2229b60-b099-4758-bbfe-ecce4c6bbbe0.avro
            ├── snap-1806340930748900084-1-dc0b7031-ff14-4dbb-8f2b-10b5150946c1.avro
            ├── snap-1821270176101527479-1-40e4cda2-2045-474f-8000-fee08d0b0b89.avro
            ├── snap-2084053056680857129-1-6f24cdc8-e92b-4ac1-a261-25443d9a61ad.avro
            ├── snap-2243697033954591765-1-05513fc3-c627-4b2d-8482-cc431f5d3092.avro
            ├── snap-2588751793768843907-1-1cacca71-5248-4f6a-8140-2c45bb6399df.avro
            ├── snap-2840275162584951513-1-7195716d-fec4-429b-8b8b-9adbf14c3290.avro
            ├── snap-3186443929475318225-1-f4161850-bd17-4403-b8ff-83ba33724c4c.avro
            ├── snap-3224849802565407046-1-10af2534-34ba-472c-b44c-6b246be153b9.avro
            ├── snap-3807080408249452424-1-28f9c973-554e-40d8-aff8-232e926158e8.avro
            ├── snap-4372125761906099061-1-065b8543-4ceb-4ac5-b1c1-c32c8ed5a6f1.avro
            ├── snap-4651567489753397852-1-83245c09-f56d-4e22-bfbb-71737b948e64.avro
            ├── snap-5069479447920727265-1-b6814152-06ed-43be-a032-b0f5cae5da07.avro
            ├── snap-5490176109434915189-1-9683fe20-6c7a-48e6-b24b-13c629d3d9e8.avro
            ├── snap-5491693674050678453-1-dbb1f238-7bd2-4fe7-b4cc-231361bbf31d.avro
            ├── snap-6172797983803291817-1-56e2a11c-da88-4dd3-b2a0-d572a7a64884.avro
            ├── snap-6245440190107425990-1-0e334a80-5af4-4e26-84f9-aba1590b226f.avro
            ├── snap-6461640912281765723-1-36bc810a-1fcc-49c8-b135-57200d0969c5.avro
            ├── snap-6507744750658002810-1-cf767c3e-1dd5-4594-853c-4523c4035c62.avro
            ├── snap-6546253251216199629-1-82b0a129-1275-4ea9-9c1b-8b594dc5b209.avro
            ├── snap-6718533968457740716-1-54d39158-12f2-4422-a27f-27a208318245.avro
            ├── snap-8316323063598850681-1-d0face11-47ed-4775-880b-79b8be4ee5c5.avro
            ├── snap-83259785047788845-1-87d59a26-bf8d-4e9f-9dee-99c86bb7e873.avro
            ├── snap-8999685882947129639-1-4628d9d8-2d0d-4646-ad0b-064d4e9852fc.avro
            ├── v1.metadata.json
            ├── v10.metadata.json
            ├── v11.metadata.json
            ├── v12.metadata.json
            ├── v13.metadata.json
            ├── v14.metadata.json
            ├── v15.metadata.json
            ├── v16.metadata.json
            ├── v17.metadata.json
            ├── v18.metadata.json
            ├── v19.metadata.json
            ├── v2.metadata.json
            ├── v20.metadata.json
            ├── v21.metadata.json
            ├── v22.metadata.json
            ├── v23.metadata.json
            ├── v24.metadata.json
            ├── v25.metadata.json
            ├── v26.metadata.json
            ├── v27.metadata.json
            ├── v3.metadata.json
            ├── v4.metadata.json
            ├── v5.metadata.json
            ├── v6.metadata.json
            ├── v7.metadata.json
            ├── v8.metadata.json
            ├── v9.metadata.json
            └── version-hint.text

4 directories, 66 files
举报

相关推荐

0 条评论