0
点赞
收藏
分享

微信扫一扫

Flink CDC 系列(11)—— MySQL 数据入湖 Hudi,流式查询Hudi

系统环境

Ubuntu 20.04
JDK 1.8
Maven 3.6.3
Flink 1.13.6
Hudi 0.10.1

MySQL 测试数据准备


mysql> CREATE DATABASE mydb;

mysql> USE mydb;

mysql> CREATE TABLE products (
       id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
       name VARCHAR(255) NOT NULL,
       description VARCHAR(512)
     );

mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter");
Query OK, 1 row affected (0.01 sec)

hudi-flink 模块源码编译

参考文章《hudi-flink 模块源码编译》
编译产生hudi-flink-bundle_2.11-0.10.1.jar在后面的Flink SQL Client启动时需要用到。

Flink CDC 源码编译

参考文章《Flink CDC 系列(2)—— Flink CDC 源码编译》
编译产生的 Jar 文件在后面的 Flink 集群准备
需要用到。

Flink 集群准备

1. 下载 flink 1.13.6 的二进制安装包

axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz

2. 解压

tar xvf flink-1.13.6-bin-scala_2.11.tgz

3. 将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到

cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib

4. 修改 /opt/flink-1.13.6/conf/workers

vi /opt/flink-1.13.6/conf/workers

workers文件内容:

localhost
localhost
localhost
localhost

意思是要在本机启动四个work进程

5. 修改 /opt/flink-1.13.6/conf/flink-conf.yaml

vi  /opt/flink-1.13.6/conf/flink-conf.yaml

设置参数: taskmanager.numberOfTaskSlots: 4

6. 下载 flink hadoop uber jar 文件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 文件拷贝到 /opt/flink-1.13.6/lib 目录下
在这里插入图片描述

7. 启动单机集群

cd /opt/flink-1.13.6
bin/start-cluster.sh

8. 查看 jobmanager 和 taskmanager 的进程是否存活

$ jps -m
66561 Jps -m
60273 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=4.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=4 -D taskmanager.memory.jvm-overhead.max=201326592b
60002 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=4.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=4 -D taskmanager.memory.jvm-overhead.max=201326592b
60628 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=4.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=4 -D taskmanager.memory.jvm-overhead.max=201326592b
59470 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
59742 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=4.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=4 -D taskmanager.memory.jvm-overhead.max=201326592b

测试

1. 启动 Flink SQL Client

cd /opt/flink-1.13.6
### hudi-flink-bundle_2.11-0.10.1.jar 是由 hudi-flink 模块源码编译得到
bin/sql-client.sh embedded -j /opt/hudi/packaging/hudi-flink-bundle/target/hudi-flink-bundle_2.11-0.10.1.jar -j ~/.m2/repository/org/apache/hive/hive-exec/2.3.1/hive-exec-2.3.1.jar

2. 在 Flink SQL Client 中执行 DDL 和 查询

Flink SQL> SET execution.checkpointing.interval = 3s;
Flink SQL> set execution.result-mode=tableau;
-- 创建 mysql-cdc source 
Flink SQL> CREATE TABLE products (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.64.6',
     'port' = '3306',
     'username' = 'test',
     'password' = 'test',
     'database-name' = 'mydb',
     'table-name' = 'products'
   );
[INFO] Execute statement succeed.
Flink SQL> select * from products;
id                 name                 description
1                 scooter1          Small 1-wheel scooter

-- 创建 hudi sink
-- hudi数据存储在本地目录文件file:///opt/data/hudi/products
-- 有条件的小伙伴可以使用其他文件系统,如HDFS
Flink SQL>	CREATE TABLE products_sink (
	    id int PRIMARY KEY NOT ENFORCED,
	    name    VARCHAR(20),
	    description         VARCHAR(64)
	  ) WITH (
	    'connector'='hudi',
	    'path'='file:///opt/data/hudi/products',
	    'table.type' = 'MERGE_ON_READ'
	  );
[INFO] Execute statement succeed.

-- mysql cdc source表的数据写入hudi
Flink SQL> insert into products_sink select * from products;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: aaff4cdfa85261e58ac415f13ba94d86

--创建流式查询到表
Flink SQL> CREATE TABLE products_streaming_query(
	    id int PRIMARY KEY NOT ENFORCED,
	    name    VARCHAR(20),
	    description         VARCHAR(64)
)
WITH (
  'connector' = 'hudi',
  'path' = 'file:///opt/data/hudi/products',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.start-commit' = '20220314215057', -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- 查看hudi表的数据
Flink SQL> select * from products_streaming_query;
+----+-------------+--------------------------------+--------------------------------+
| op |          id |                           name |                    description |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                       scooter1 |          Small 1-wheel scooter |
-- 这一步相当于在后台提交了一个Flink流式计算任务,当有新数据到来的时候,这里也会实时的刷新,为了方便观测,请勿退出该SQL或者关闭窗口

-- 字段op的操作类型的意思,+I代表新增

3. 在Mysql客户端插入新的数据

mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");
mysql> INSERT INTO products VALUES (default,"scooter3","Small 3-wheel scooter");

4. 观察 Flink SQL Client 的数据变化

+----+-------------+--------------------------------+--------------------------------+
| op |          id |                           name |                    description |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                       scooter1 |          Small 1-wheel scooter |
| +I |           2 |                       scooter2 |          Small 2-wheel scooter |
| +I |           3 |                       scooter3 |          Small 3-wheel scooter |        
-- 序号为2和3的新数据实时得显示出来

5. 在Mysql客户端执行update

update products set name = 'scooter----3' where id = 3;

6. 观察 Flink SQL Client 的数据变化

+----+-------------+--------------------------------+--------------------------------+
| op |          id |                           name |                    description |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                       scooter1 |          Small 1-wheel scooter |
| +I |           2 |                       scooter2 |          Small 2-wheel scooter |
| +I |           3 |                       scooter3 |          Small 3-wheel scooter |
| +I |           3 |                   scooter----3 |          Small 3-wheel scooter |
-- 第三条数据在hudi中也被更新了

7. 在Mysql客户端执行delete

delete from products where id = 3;

8. 观察 Flink SQL Client 的数据变化

+----+-------------+--------------------------------+--------------------------------+
| op |          id |                           name |                    description |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                       scooter1 |          Small 1-wheel scooter |
| +I |           2 |                       scooter2 |          Small 2-wheel scooter |
| +I |           3 |                       scooter3 |          Small 3-wheel scooter |
| +I |           3 |                   scooter----3 |          Small 3-wheel scooter |
| -D |           3 |                         (NULL) |                         (NULL) |

-- 第三条数据出现一条"-D"删除标记的数据
举报

相关推荐

0 条评论