前提 需要打开Mysql的 binlog 功能
show variables like 'log_bin%';
添加依赖
<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
代码添加监听
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.jeecg.common.util.RedisUtil;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
/**
* 监听mysql BinaryLog 维护缓存
*/
@Component
public class ListenerMysql {
/**
* redis
*/
@Resource
private RedisUtil redisUtil;
private static final List<String> TABLE_NAME = ListUtil.of("demo");
{
System.out.println("启动监听:启动中");
getThread().start();
System.out.println("启动监听:成功");
}
public Thread getThread() {
BinaryLogClient client = new BinaryLogClient("IP", 3306, "账号", "密码");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
//设置需要读取的Binlog的文件以及位置,否则,client会从"头"开始读取Binlog并监听
// client.setBinlogFilename("mysql-bin.000014");
// client.setBinlogPosition("/www/server/data/");
Thread thread = new Thread(() -> {
client.registerEventListener(event -> {
final EventData data = event.getData();
if (data instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
System.out.println(writeRowsEventData);
} else if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
System.out.println(updateRowsEventData);
} else if (data instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
System.out.println(deleteRowsEventData);
} else if (data instanceof QueryEventData) {
QueryEventData queryEventData = (QueryEventData) data;
String database = queryEventData.getDatabase();
if ("jeecg-boot".equals(database)) {
String sql = queryEventData.getSql();
if (sql != null && sql.length() >= 10) {
String[] s = sql.split(" ");
String type = s[0];
String tableName = null;
type = type.toUpperCase();
switch (type) {
case "INSERT":
//添加
case "DELETE":
//删除
tableName = s[2];
break;
case "UPDATE":
//修改
tableName = s[1];
break;
default:
break;
}
if (tableName != null) {
tableName = tableName.replaceAll("`", "")
.replaceAll("'", "")
.replaceAll("\\.", "").toLowerCase();
if (TABLE_NAME.contains(tableName)) {
//清理对应缓存
redisUtil.removeAll(StrUtil.format("CACHE:{}:", tableName));
}
}
}
}
}
});
try {
client.connect();
} catch (IOException e) {
e.printStackTrace();
}
});
return thread;
}
}