文章目录
第一章 初识 Canal
1、什么是canal
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。
Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。
2、什么是Binlog
二进制日志包含两类文件:二进制日志索引文件(文件名后缀为 .index)用于记录所有二进制文件,二进制日志文件(文件名后缀为 .00000*)记录数据库所有 DDL 和 DML(除了数据查询语句)语句事件。
3、Binlog 的分类
4、Canal 工作原理
4.1 Mysql 主从复制过程
4.2 Canal 的工作原理
很简单,就是把自己伪装成 Slave ,假装从 Master 复制数据
4.3 Canal 的使用场景
第二章 Mysql 准备
1、创建数据库
2、创建表
CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);
3、修改配置文件开启 Binlog
vi /etc/my.cnf
注意:binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog
4、重启 MySQL 使配置生效
systemctl restart mysqld
到 /var/lib/mysql 目录下查看初始文件大小 154
5、测试 Binlog 是否开启
插入数据
INSERT INTO user_info VALUES('1000','zhangsan','male');
再次查看 index 文件大小
6、赋权限
在 mysql 中执行
set global validate_password_length=4;
set global validate_password_policy=0;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
第三章 Canal 下载和安装
1、下载并解压 Jar 包
下载地址 https://github.com/alibaba/canal/releases
2、查看 canal.properties 的配置(不做修改)
说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认 tcp,改为输出到 kafka多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。
3、修改 instance.properties 的配置
我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下
1)配置 MySQL 服务器地址
2)配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
第四章 实时监控测试
1、TCP 模式测试
1.1 创建 canal 项目
1.2 导入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
1.3 通用监视类 ——CanalClient
Canal 封装的数据结构
在 canal 模块下创建 com.ausware.app 包,并在包下创建 CanalClient(java 代码)
代码如下:
/**
* canal客户端
* @author Yao
* @Date: 2022/2/15 15:28
*/
public class CanalClient {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
//连接Canal服务器
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("node1", 11111), "example", "", "");
while (true){
canalConnector.connect();
//订阅数据库 .* 代表该数据库下所有表
canalConnector.subscribe("canal_test.*");
//单次拉取1000条变化数据
Message message = canalConnector.get(100);
List<CanalEntry.Entry> entries = message.getEntries();
if (CollectionUtils.isEmpty(entries)){
System.out.println("本次拉取数据为空,稍后拉去....");
TimeUnit.SECONDS.sleep(2);
}else {
for (CanalEntry.Entry entry : entries) {
//获取表名
String tableName = entry.getHeader().getTableName();
//
CanalEntry.EntryType entryType = entry.getEntryType();
//获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//确保类型为ROWDATA
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
//对数据进行反序列化
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDataList) {
JSONObject beforeData = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
beforeData.put(column.getName(), column.getValue());
}
JSONObject afterData = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
System.out.println("表名:"+tableName+
",entryType:"+entryType+
",before"+beforeData+
",after"+afterData);
}
}
}
}
}
}
2、kafka 模式测试
1.修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka
2.修改 Kafka 集群的地址
3.修改 instance.properties 输出到 Kafka 的主题以及分区数
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序 , 如 果 要 提 高 并 行 度 , 首 先 设 置 kafka 的 分 区 数 >1, 然 后 设 置canal.mq.partitionHash 属性
4.启动 Canal
./start.sh
5.看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
如果没有正常启动,查看日志报一下错误
打开 start.sh 文件修改参数,重启
str=`file -L $JAVA | grep 64-bit`
if [ -n "$str" ]; then
JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -Xss256k"
else
JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m "
fi
6.启动 Kafka 消费客户端测试,查看消费情况
7.向 MySQL 中插入数据后查看消费者控制台