springboot+canal+rocketmq+redis集成
前言
前面将canal与redis直接集成进了springboot,我们需要在代码中建立canal的连接。
考虑到canal中对mq的支持,我们再实现一个通过mq去实现redis与mq消息的同步。
提示:以下是本篇文章正文内容,下面案例可供参考
一、修改canal服务的配置
修改cana.properties文件,加入rocketmq的配置
修改example实例下的instance.properties文件,指定rocketmq的topic
重启canal服务
二、springboot中集成rocketmq
1.引入依赖
代码如下(示例):
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
添加rocketmq配置
在application.properties中加入rocketmq相关配置
# 请填写自己的rocketmq相关ip端口
rocketmq.name-server=rocketmq.lee.com:9876
# 这里指定的生产组需要与canal.properties中rocketmq.producer.group配置一致
rocketmq.producer.group=test
rocketmq.producer.max-message-size=131072
rocketmq.producer.send-message-timeout=3000
添加消费者监听类
代码如下(示例):
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lee.canal.entity.CanalDtoEntity;
import com.lee.canal.entity.UserEntity;
import com.lee.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 创建消费者监听
* @RocketMQMessageListener中topic为canal中配置的topic
* @author 联想
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "example",consumerGroup = "example",selectorExpression="*")
public class UserServiceRocketMqImpl implements RocketMQListener<MessageExt> {
/**
* 将消息同步到redis中
* 消息基本结构
* {
* "data":[
* {
* "id":"10",
* "username":"bffafaaa",
* "telephone":"12345"
* }
* ],
* "database":"canal_test",
* "es":1649838746000,
* "id":10,
* "isDdl":false,
* "mysqlType":{
* "id":"int(11)",
* "username":"varchar(60)",
* "telephone":"varchar(30)"
* },
* "old":[
* {
* "username":"bffafaa"
* }
* ],
* "pkNames":[
* "id"
* ],
* "sql":"",
* "sqlType":{
* "id":4,
* "username":12,
* "telephone":12
* },
* "table":"t_user",
* "ts":1649838747174,
* "type":"UPDATE"
* }
* @param msg
*/
@Override
public void onMessage(MessageExt msg) {
String message = new String(msg.getBody());
log.info("收到消息{}", message);
// 将消息转换为 java对象
CanalDtoEntity entity = JSONObject.parseObject(message, CanalDtoEntity.class);
// 判断是否是ddl语句
if(!entity.isDdl()){
// 获取表名
String tableName = entity.getTable();
// 获取数据
List<UserEntity> datas = entity.getData();
// 根据type类型判断为增删改操作
if("INSERT".equals(entity.getType()) || "UPDATE".equals(entity.getType())){
for (UserEntity u: datas ) {
RedisUtils.stringSet(tableName+":"+u.getId(),JSON.toJSONString(u));
}
}else if("DELETE".equals(entity.getType())){
for (UserEntity u: datas ) {
RedisUtils.delKey(tableName+":"+u.getId());
}
}
}
}
}
对应的entity代码
import lombok.Data;
import lombok.ToString;
import java.util.List;
/**
* canal传输的user整体结构
* @author 联想
*/
@Data
@ToString
public class CanalDtoEntity {
private List<UserEntity> data;
private String database;
private long es;
private int id;
//是否是DDL语句
private boolean isDdl;
//表结构的字段类型
private MysqlTypeEntity mysqlType;
//UPDATE语句,旧数据
private String old;
//主键名称
private List<String> pkNames;
//sql语句
private String sql;
private SqlTypeEntity sqlType;
//表名
private String table;
private long ts;
//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
private String type;
}
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class MysqlTypeEntity {
private String id;
private String username;
private String telephone;
}
@Data
@ToString
public class SqlTypeEntity {
private Integer id;
private Integer username;
private Integer telephone;
}
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/**
* @author 联想
*/
@Data
@ToString
public class UserEntity implements Serializable {
private Long id;
private String username;
private String telephone;
}
启动springboot
验证截图
依次执行sql
insert into t_user(username,telephone) VALUES ("张三","12345678910");
update t_user set username = "胡八一";
DELETE from t_user where username = "胡八一";
控制台日志
rocket控制台截图
redis截图