0
点赞
收藏
分享

微信扫一扫

记录flink sql实时同步mysql数据表到mysql

芝婵 2023-12-22 阅读 33

测试环境:

操作系统: centos7

Flink: 1.18.0


测试flink sql所需驱动(需要根据对应得flink版本,取用相应版本的驱动)

下载好的驱动,放到flink目录下的lib目录

# flink目录树图
[root@localhost ~]# tree -L 1 /usr/local/flink-1.18.0/
/usr/local/flink-1.18.0/
├── bin
├── conf
├── examples
├── lib
├── LICENSE
├── licenses
├── log
├── NOTICE
├── opt
├── plugins
└── README.txt

flink-connector-jdbc-3.1.1.jar

下载连接:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/


flink-sql-connector-mysql-cdc-3.0.0.jar

下载连接:https://github.com/ververica/flink-cdc-connectors/releases

记录flink sql实时同步mysql数据表到mysql_bc

mysql-connector-java-8.0.28.jar

下载连接:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/


下载部署flink (本记录以1.18版本作为实验版本)

1、根据个人习惯把二进制包下载到/opt目录下

cd /opt


2、执行命令下载二进制包

wget https://www.apache.org/dyn/closer.lua/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz --no-check-certificate


3、根据个人习惯解压二进制包到/usr/local

tar -zxvf flink-1.18.0-bin-scala_2.12.tgz -C /usr/local


4、启动flink

cd flink的所在目录

cd /usr/local/flink-1.18.0/bin

执行启动服务命令

./start-cluster.sh


5、通过浏览器访问IP:8081,验证服务启动成功

记录flink sql实时同步mysql数据表到mysql_mysql_02


使用flink sql创建数据同步作业

1、启动sql-client.sh

./sql-client.sh embedded

记录flink sql实时同步mysql数据表到mysql_bc_03

2、根据需要同步的数据创建源表

CREATE TABLE nbaplayers (
  player_id INT,
  team_id INT,
  player_name VARCHAR,
  height FLOAT,
  PRIMARY KEY (player_id) NOT ENFORCED
  ) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.186.129',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Admin@123698745',
  'database-name' = 'nba',
  'table-name' = 'player',
  'server-time-zone' = 'Asia/Shanghai'
  );

记录flink sql实时同步mysql数据表到mysql_mysql_04

3、创建结果表

CREATE TABLE sink (
  player_id INT,
  team_id INT,
  player_name VARCHAR,
  height FLOAT,
  PRIMARY KEY (player_id) NOT ENFORCED
  ) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.186.129:3306/nba?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'root',
  'password' = 'Admin@123698745',
  'table-name' = 'player_sink'
   );

记录flink sql实时同步mysql数据表到mysql_mysql_05

4、执行从源表插入结果表操作,生成同步作业

INSERT INTO sink
 SELECT
 player_id,
 team_id,
 player_name,
 height
 FROM nbaplayers;

记录flink sql实时同步mysql数据表到mysql_flink_06


验证同步任务创建完成

1、检查面板

记录flink sql实时同步mysql数据表到mysql_flink_07


2、检查数据库中数据同步情况

记录flink sql实时同步mysql数据表到mysql_flink_08

当源表数据发生“增”,”删“,”改“变化时,结果表也同步发生变化。


本次实验只实现了单个数据表的数据同步,因为刚刚接触到这种内容,还在学习过程中,还有更多内容需要进一步学习和试验,感觉flink cdc社区的老师耐心指导和帮助。

举报

相关推荐

0 条评论