0
点赞
收藏
分享

微信扫一扫

Elastic实战:canal自定义客户端,实现mysql多表同步到es


0. 引言

我们之前讲解了利用canal实现无代码入侵的同步mysql数据到elasticsearch,并且讲解了主子表数据如何同步。

​​通过canal1.1.5实现mysql8.0数据增量/全量同步到elasticsearch7.x​​canal同步mysql到es之父子表数据同步|对象型数组同步|nested数组同步

但具体生产中,仍然有更加复杂的同步需求,之前也有几位同学咨询过我,因为canal只支持2张表的数据同步,并不支持3张表及以上的同步,当不少的业务需要3表以上的同步,这就需要我们自定义canal客户端来实现了,那么今天我们就来实操演示下自定义canal客户端,实现多表同步

1. canal简介

anal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等

canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin,同时我们还可以自定义客户端,我们讲自定义的客户端称为client

canal的数据同步流程如下图所示

Elastic实战:canal自定义客户端,实现mysql多表同步到es_spring

2. 环境准备

2.1 安装jdk

canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。

canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错​​NoSuchMethodError​

2.2 安装canal

1、截止本文,canal的稳定版已更新到1.1.6了, 所以本文也以这个版本为例。

这里因为我们要自定义客户端,所以只用下载服务端deployer即可

​​官方下载地址​​

Elastic实战:canal自定义客户端,实现mysql多表同步到es_elasticsearch_02

当然也可以通过wget指令直接下载到服务器

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

详细的安装步骤不再累述了,还不清楚的同学可以参考上一篇文章

​​通过canal来实现mysql数据同步到elasticsearch​​

2.3 mysql配置

1、因为同步是基于binlog实现的,所以要现在mysql中开启binlog

修改mysql配置文件

vim /etc/my.cnf

修改内容

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式

2、源数据库创建一个canal账号,并且设置​​slave​​​,​​dump​​权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

Elastic实战:canal自定义客户端,实现mysql多表同步到es_spring_03

3、因为mysql8.0.3后身份检验方式为​​caching_sha2_password​​​,但canal使用的是​​mysql_native_password​​​,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错​​IOException: caching_sha2_password Auth failed​

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;

3. 实操

3.1 服务端deployer配置

1、查询源mysql服务器的binlog位置

# 源mysql服务器中登陆mysql执行
show binary logs;

Elastic实战:canal自定义客户端,实现mysql多表同步到es_spring_04

2、进入deployer安装目录

cd deployer

3、我们新建一个实例​​es​​专门用于本次演示

cd conf
# 复制example实例配置
cp -R example es

4、修改实例es配置文件instance.properties

cd es
vim instance.properties

修改内容

# position info
# 源数据库地址及端口
canal.instance.master.address=192.168.244.17:3306
# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=mysql-bin.000001
# 开始同步的binlog文件位置
canal.instance.master.position=0
# 开始同步时间点 时间戳形式
canal.instance.master.timestamp=1546272000000

# 数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# 配置不同步mysql库
canal.instance.filter.black.regex=mysql\..*

mysql数据同步起点说明:

  • canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
  • canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
  • 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

5、启动服务端

./bin/start.sh

6、查看示例日志,无报错则说明启动成功

cat logs/es/es.log

Elastic实战:canal自定义客户端,实现mysql多表同步到es_mysql_05

针对服务端的详细配置项解释,可以参考官方文档:

​​配置项解释​​

Elastic实战:canal自定义客户端,实现mysql多表同步到es_数据库_06

3.2 自定义客户端client

1、新建一个springboot项目,我们结合之前讲解的​​spring-data-elasticsearch​​来作为es客户端,这里就不单独说明其配置了,还不知道的同学可以参考之前的文章

​​从零搭建springboot整合spring data elasticsearch4.2.x环境​​

引入依赖​​spring-data-elasticsearch​​​、​​canal-spring-boot-starter​​​、​​mybatis-plus​

<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>4.2.10</version>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.2</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

2、修改配置文件​​application.yml​

# 应用名称
spring:
application:
name: canal_client_es
elasticsearch:
rest:
# es 地址
uris: http://192.168.244.11:9200
username: elastic
password: elastic
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
name: defaultDataSource
url: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8
username: root
password: 123456

server:
port: 8080

# canal服务端地址
canal:
server: 192.168.244.22:11111
# 实例名,与deployer中配置的保持统一
destination: es

# 设置canal消息日志打印级别
logging:
level:
top.javatool.canal.client: warn

3、创建es客户端配置

/**
* @author benjamin
* @date 2022/10/1
*/
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.canal_client_es")
public class ElasticRestClientConfig extends AbstractElasticsearchConfiguration {

@Value("${spring.elasticsearch.rest.uris}")
private String url;
@Value("${spring.elasticsearch.rest.username}")
private String username;
@Value("${spring.elasticsearch.rest.password}")
private String password;

@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
url = url.replace("http://","");
String[] urlArr = url.split(",");
HttpHost[] httpPostArr = new HttpHost[urlArr.length];
for (int i = 0; i < urlArr.length; i++) {
HttpHost httpHost = new HttpHost(urlArr[i].split(":")[0].trim(),
Integer.parseInt(urlArr[i].split(":")[1].trim()), "http");
httpPostArr[i] = httpHost;
}
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password));
RestClientBuilder builder = RestClient.builder(httpPostArr)
// 异步httpclient配置
.setHttpClientConfigCallback(httpClientBuilder -> {
// 账号密码登录
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// httpclient连接数配置
httpClientBuilder.setMaxConnTotal(30);
httpClientBuilder.setMaxConnPerRoute(10);
// httpclient保活策略
httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));
return httpClientBuilder;
});
return new RestHighLevelClient(builder);
}

@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate(RestHighLevelClient elasticsearchClient,ElasticsearchConverter elasticsearchConverter){
return new ElasticsearchRestTemplate(elasticsearchClient,elasticsearchConverter);
}

}

4、实现根据实体类自动创建es索引的配置类,不需要可跳过这步

@Configuration
@Slf4j
@AllArgsConstructor
public class ElasticCreateIndexStartUp implements ApplicationListener<ContextRefreshedEvent> {

private final ElasticsearchRestTemplate restTemplate;

@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent){
log.info("[elastic]索引初始化...");
Reflections f = new Reflections("com.example.canal_client_es.entity");
Set<Class<?>> classSet = f.getTypesAnnotatedWith(Document.class);
for (Class<?> clazz : classSet) {
IndexOperations indexOperations = restTemplate.indexOps(clazz);
if(!indexOperations.exists()){
indexOperations.create();
indexOperations.putMapping();
log.info(String.format("[elastic]索引%s数据结构创建成功",clazz.getSimpleName()));
}
}
log.info("[elastic]索引初始化完毕");
}
}

4、创建订单、商品、收货人实体,其中一个订单下有多个商品、多个收货人,我们希望同步订单表时,将商品、收货人两张表的信息同步更新。

同时因为我们需要与数据库做映射,同时也需要与es做映射,所以需要创建面向mysql和es的实体类,当然你也可以将两种整合到一起(如下所示的商品实体、收货人实体),这里为了让大家清晰的认识,我将其分开(如下所示的订单实体)

es实体类

// 订单实体
@Data
@Document(indexName = "my_order")
@Setting(replicas = 0,shards = 1)
public class Order implements Serializable {

/**
* 主键
*/
@Id
private Long id;

/**
* 订单号
*/
@Field(type = FieldType.Keyword, name="seqNo")
private String seqNo;

/**
* 总价
*/
@Field(type = FieldType.Double, name="totalPrice")
private BigDecimal totalPrice;

/**
* 数量
*/
@Field(type = FieldType.Integer, name="quantity")
private Integer quantity;

/**
* 商品清单
*/
@Field(type = FieldType.Nested, name="productList")
private List<Product> productList;

/**
* 收货人清单
*/
@Field(type = FieldType.Nested, name="userList")
private List<User> userList;

}

// 商品实体
@Data
@Table(name = "product")
public class Product implements Serializable {

@Field(type = FieldType.Long, name="id")
private Long id;

@Field(type = FieldType.Keyword, name="seqNo")
@Column(name = "seq_no")
private String seqNo;

@Field(type = FieldType.Double, name="price")
private BigDecimal price;

@Field(type = FieldType.Text, name="name", analyzer = "ik_smart")
private String name;

}

// 收货人实体
@Data
@Table(name = "user")
public class User implements Serializable {

@Field(type = FieldType.Long, name="id")
private Long id;

@Field(type = FieldType.Keyword, name="seqNo")
@Column(name = "seq_no")
private String seqNo;

@Field(type = FieldType.Keyword, name="name")
private String name;

@Field(type = FieldType.Integer, name="age")
private Integer age;

@Field(type = FieldType.Text, name="address", analyzer = "ik_smart")
private String address;
}

数据库实体,并用jpa的注解​​@Column​​来映射字段名。商品、收货人的数据库实体则整合到es实体中了,如上

@Data
@Table(name = "my_order")
public class OrderPO implements Serializable {

/**
* 主键
*/
@Column(name = "id")
private Long id;

/**
* 订单号
*/
@Column(name = "seq_no")
private String seqNo;

/**
* 总价
*/
@Column(name = "total_price")
private BigDecimal totalPrice;

/**
* 数量
*/
@Column(name = "quantity")
private Integer quantity;

}

5、我们基于mybatis-plus来操作数据库,因此需要创建实体的mapper、service。详细的代码大家按照mybatis-plus的用法创建即可,或者通过本文最后下载源码查看。这里不再累叙。

Elastic实战:canal自定义客户端,实现mysql多表同步到es_数据库_07

6、操作到这里,最好把你的项目启动一下,如果正常则继续往下操作,如果不正常也好提前排错,不要压到最后发现一堆错,也不知道错在哪里。

7、接下来我们基于canal-client提供的​​EntryHandler​​类来实现对于数据表的监控,从而达到数据的增删改同步

@CanalTable("my_order")
@Component
@AllArgsConstructor
@Slf4j
public class OrderHandler implements EntryHandler<OrderPO> {

private final ElasticsearchRestTemplate elasticsearchRestTemplate;

private final IProductService productService;

private final IUserService userService;

@Override
public void insert(OrderPO orderPO) {
Order order = new Order();
BeanUtils.copyProperties(orderPO,order);
List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));
order.setProductList(productList);
List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));
order.setUserList(userList);
elasticsearchRestTemplate.save(order);
}

@Override
public void update(OrderPO before, OrderPO after) {
Order order = new Order();
BeanUtils.copyProperties(after,order);
List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));
order.setProductList(productList);
List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));
order.setUserList(userList);
elasticsearchRestTemplate.save(order);
}

@Override
public void delete(OrderPO orderPO) {
elasticsearchRestTemplate.delete(orderPO.getId().toString(),Order.class);
}
}

3.3 测试

1、新增一条订单数据

Elastic实战:canal自定义客户端,实现mysql多表同步到es_数据库_08

2、kibana中查看索引数据

GET my_order/_search

Elastic实战:canal自定义客户端,实现mysql多表同步到es_spring_09

结果显示新增的订单表同步成功,并且两张子表的数据也成功同步了。

3、再修改一下订单数据

Elastic实战:canal自定义客户端,实现mysql多表同步到es_mysql_10

kibana查看索引,显示同步成功

Elastic实战:canal自定义客户端,实现mysql多表同步到es_mysql_11

4、我们将刚刚新增的订单数据在数据库中删除

Elastic实战:canal自定义客户端,实现mysql多表同步到es_mysql_12

同时kibana中也删除成功,说明我们删除的同步也生效了。

Elastic实战:canal自定义客户端,实现mysql多表同步到es_数据库_13

3.4 子表数据修改,同步主表

上述我们演示了主表数据修改时,同步主表以及两张子表的数据;有时我们需要修改子表数据,但也需要实现数据同步。

这就需要我们实现一个子表的​​EntryHandler​​,用于监听子表的数据变化,其逻辑是子表数据更新时,查询主子表的数据,再同步更新到索引中即可。

注意要监听的是子表,每张子表一个监听器,如果需要监听两张子表,那么就需要分别创建两个监听器

@CanalTable("product")
@Component
@AllArgsConstructor
@Slf4j
public class ProductHandler implements EntryHandler<Product> {

private final ElasticsearchRestTemplate elasticsearchRestTemplate;

@Override
public void insert(Product product) {
// TODO
}

@Override
public void update(Product before, Product after) {
// TODO
}

@Override
public void delete(Product product) {
// TODO

}
}

演示源码

文中演示源码可在如下地址下载:

​​git源码地址​​

总结

自此我们的数据同步就演示完成了,如果有更加复杂的同步逻辑,也可以在代码中自定义实现,并且第三方组件​​canal-spring-boot-starter​​极大的简化了我们自定义canal客户端的难度。

不过遗憾的是canal-spring-boot-starter的作者目前已经停止了对其的维护,其最新版对应的canal实际是​​1.1.3​​版本的,不过实测还不影响我们对接canal1.1.6。如果大家对canal客户端又更高性能的需求,可以研究源码,高度二开。

后续我们将给大家讲解如何实现类​​canal-spring-boot-starter​​这样的第三方依赖组件。感兴趣的同学可以关注专栏。


举报

相关推荐

0 条评论