测试环境:
操作系统: centos7
Flink: 1.18.0
测试flink sql所需驱动(需要根据对应得flink版本,取用相应版本的驱动)
下载好的驱动,放到flink目录下的lib目录
# flink目录树图
[root@localhost ~]# tree -L 1 /usr/local/flink-1.18.0/
/usr/local/flink-1.18.0/
├── bin
├── conf
├── examples
├── lib
├── LICENSE
├── licenses
├── log
├── NOTICE
├── opt
├── plugins
└── README.txt
flink-connector-jdbc-3.1.1.jar
下载连接:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/
flink-sql-connector-mysql-cdc-3.0.0.jar
下载连接:https://github.com/ververica/flink-cdc-connectors/releases
mysql-connector-java-8.0.28.jar
下载连接:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/
下载部署flink (本记录以1.18版本作为实验版本)
1、根据个人习惯把二进制包下载到/opt目录下
cd /opt
2、执行命令下载二进制包
wget https://www.apache.org/dyn/closer.lua/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz --no-check-certificate
3、根据个人习惯解压二进制包到/usr/local
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz -C /usr/local
4、启动flink
cd flink的所在目录
cd /usr/local/flink-1.18.0/bin
执行启动服务命令
./start-cluster.sh
5、通过浏览器访问IP:8081,验证服务启动成功
使用flink sql创建数据同步作业
1、启动sql-client.sh
./sql-client.sh embedded
2、根据需要同步的数据创建源表
CREATE TABLE nbaplayers (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.186.129',
'port' = '3306',
'username' = 'root',
'password' = 'Admin@123698745',
'database-name' = 'nba',
'table-name' = 'player',
'server-time-zone' = 'Asia/Shanghai'
);
3、创建结果表
CREATE TABLE sink (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.186.129:3306/nba?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'Admin@123698745',
'table-name' = 'player_sink'
);
4、执行从源表插入结果表操作,生成同步作业
INSERT INTO sink
SELECT
player_id,
team_id,
player_name,
height
FROM nbaplayers;
验证同步任务创建完成
1、检查面板
2、检查数据库中数据同步情况
当源表数据发生“增”,”删“,”改“变化时,结果表也同步发生变化。
本次实验只实现了单个数据表的数据同步,因为刚刚接触到这种内容,还在学习过程中,还有更多内容需要进一步学习和试验,感觉flink cdc社区的老师耐心指导和帮助。