0
点赞
收藏
分享

微信扫一扫

windows下配置canal

伽马星系 2022-07-14 阅读 72

MySQL开启binlog

show variables like '%log_bin%';

如果为关闭状态,打开my.ini文件 ,添加如下配置

[mysqld]
#开启bin_log canal伪装为从库监听协议
server_id=1
log_bin=mysql-bin
log_bin-index=master-bin.index
binlog-format=ROW

下载canal

github比较慢,直接使用下载好的,这里使用的版本是1-4

mysql5.7没有user表执行以下语句初始化表

use mysql;
select * from user;
# 重置root密码为空
update user set authentication_string='' where user='root';

创建canal用户并授权

create user canal IDENTIFIED by 'canal';
grant select,replication slave,replication client on*.* to 'canal'@'%';
grant all privileges on *.* TO 'canal'@'%';
flush privileges;

配置canal监听MySQL数据库

复制conf下的example文件夹,并重名,修改新文件里面的配置文件,配置好数据库相关
在这里插入图片描述

进入bin目录,双击bat文件

在这里插入图片描述
报错了,解决办法:
将bin文件夹中的startup.bat中红框中删除
在这里插入图片描述

启动客户端监听

引入maven依赖

    <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.5</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.5</version>
        </dependency>
    </dependencies>

最简单的demo

    public static void main(String[] args) {
        String ip = AddressUtils.getHostIp();
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111), "gold", "canal", "canal");
        canalConnector.connect();
        canalConnector.subscribe();
        int batch = 1000;
        Message message = canalConnector.get(batch);
        List<CanalEntry.Entry> entries = message.getEntries();
        System.out.println(entries);
    }

数据格式


### 随便修改数据库,打印entries数据,发现store里面的数据是压缩的(俗称扁平化的)
[
header {
  version: 1
  logfileName: "binlog.000005"
  logfileOffset: 842
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1656644165000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 72
}
entryType: TRANSACTIONBEGIN
storeValue: " \a"
, 

header {
  version: 1
  logfileName: "binlog.000005"
  logfileOffset: 980
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1656644165000
  sourceType: MYSQL
  schemaName: "blog"
  tableName: "m_user"
  eventLength: 391
  eventType: UPDATE
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA
storeValue: "\bl\020\002P\000b\221\a\n&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\0011R\nbigint(20)\n,\b\001\020\f\032\busername \000(\0000\000B\tmarkerhubR\vvarchar(64)\n\213\001\b\002\020\f\032\006avatar \000(\0000\000Bihttps://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpgR\fvarchar(255)\n(\b\003\020\f\032\005email \000(\0000\000B\b11111111R\vvarchar(64)\nC\b\004\020\f\032\bpassword \000(\0000\000B 96e79218965eb72c92a549dd5a330112R\vvarchar(64)\n\035\b\005\020\004\032\006status \000(\0000\000B\0010R\006int(5)\n2\b\006\020]\032\acreated \000(\0000\000B\0232020-04-20 10:44:01R\bdatetime\n \b\a\020]\032\nlast_login \000(\0000\001R\bdatetime\022&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\0011R\nbigint(20)\022,\b\001\020\f\032\busername \000(\0000\000B\tmarkerhubR\vvarchar(64)\022\213\001\b\002\020\f\032\006avatar \000(\0000\000Bihttps://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpgR\fvarchar(255)\022)\b\003\020\f\032\005email \000(\0010\000B\t111111111R\vvarchar(64)\022C\b\004\020\f\032\bpassword \000(\0000\000B 96e79218965eb72c92a549dd5a330112R\vvarchar(64)\022\035\b\005\020\004\032\006status \000(\0000\000B\0010R\006int(5)\0222\b\006\020]\032\acreated \000(\0000\000B\0232020-04-20 10:44:01R\bdatetime\022 \b\a\020]\032\nlast_login \000(\0000\001R\bdatetime"
, 

header {
  version: 1
  logfileName: "binlog.000005"
  logfileOffset: 1371
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1656644165000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00235"
]

处理storevalue

//我们可以根据类型来过滤里面的数据,比如一些事物类型我们需要忽略,因此需要对数据进行一些处理。
   for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
        //这里需要把storeValue转为肉眼可见的数据格式
                      try {
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                            System.out.println(rowChange);

                        } catch (InvalidProtocolBufferException e) {
                            e.printStackTrace();
                        }

}

rowchange具体格式:

tableId: 108
eventType: UPDATE
isDdl: false
rowDatas {
  beforeColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "1"
    mysqlType: "bigint(20)"
  }
  beforeColumns {
    index: 1
    sqlType: 12
    name: "username"
    isKey: false
    updated: false
    isNull: false
    value: "markerhub"
    mysqlType: "varchar(64)"
  }
  beforeColumns {
    index: 2
    sqlType: 12
    name: "avatar"
    isKey: false
    updated: false
    isNull: false
    value: "https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg"
    mysqlType: "varchar(255)"
  }
  beforeColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: false
    isNull: false
    value: "111111111"
    mysqlType: "varchar(64)"
  }
  beforeColumns {
    index: 4
    sqlType: 12
    name: "password"
    isKey: false
    updated: false
    isNull: false
    value: "96e79218965eb72c92a549dd5a330112"
    mysqlType: "varchar(64)"
  }
  beforeColumns {
    index: 5
    sqlType: 4
    name: "status"
    isKey: false
    updated: false
    isNull: false
    value: "0"
    mysqlType: "int(5)"
  }
  beforeColumns {
    index: 6
    sqlType: 93
    name: "created"
    isKey: false
    updated: false
    isNull: false
    value: "2020-04-20 10:44:01"
    mysqlType: "datetime"
  }
  beforeColumns {
    index: 7
    sqlType: 93
    name: "last_login"
    isKey: false
    updated: false
    isNull: true
    mysqlType: "datetime"
  }
  afterColumns {
    index: 0
    sqlType: -5
    name: "id"
    isKey: true
    updated: false
    isNull: false
    value: "1"
    mysqlType: "bigint(20)"
  }
  afterColumns {
    index: 1
    sqlType: 12
    name: "username"
    isKey: false
    updated: false
    isNull: false
    value: "markerhub"
    mysqlType: "varchar(64)"
  }
  afterColumns {
    index: 2
    sqlType: 12
    name: "avatar"
    isKey: false
    updated: false
    isNull: false
    value: "https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg"
    mysqlType: "varchar(255)"
  }
  afterColumns {
    index: 3
    sqlType: 12
    name: "email"
    isKey: false
    updated: true
    isNull: false
    value: "111111"
    mysqlType: "varchar(64)"
  }
  afterColumns {
    index: 4
    sqlType: 12
    name: "password"
    isKey: false
    updated: false
    isNull: false
    value: "96e79218965eb72c92a549dd5a330112"
    mysqlType: "varchar(64)"
  }
  afterColumns {
    index: 5
    sqlType: 4
    name: "status"
    isKey: false
    updated: false
    isNull: false
    value: "0"
    mysqlType: "int(5)"
  }
  afterColumns {
    index: 6
    sqlType: 93
    name: "created"
    isKey: false
    updated: false
    isNull: false
    value: "2020-04-20 10:44:01"
    mysqlType: "datetime"
  }
  afterColumns {
    index: 7
    sqlType: 93
    name: "last_login"
    isKey: false
    updated: false
    isNull: true
    mysqlType: "datetime"
  }
}
举报

相关推荐

0 条评论