Debezium 通过解析数据库日志获取DDL和DML的操作数据, Debezium connectors可以运行在 Kafka Connect 服务中,同时也支持嵌入应用程序中运行。本文运行一个嵌入式例子。
1 开发环境 eclipse gradle
build.gradle引入jar包
dependencies {
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.apache.logging.log4j:log4j-api:2.8.2'
compile 'org.apache.logging.log4j:log4j-core:2.8.2'
compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.8.2'
compile 'io.debezium:debezium-api:1.9.3.Final'
compile 'io.debezium:debezium-embedded:1.9.3.Final'
compile 'io.debezium:debezium-connector-mysql:1.9.3.Final'
}
说明:引用log4j包报错时会提示,如果不引入报错程序结束不知道原因。
2 监听mysql数据库日志
修改Mysql配置文件 (MySql 5.x 以上版本)
- 增量捕获功能需要修改 MySQL配置文件 linux为my.cnf或windows为my.ini,添加内容如下,然后重启数据库
[mysqld] log-bin=mysql-bin #添加这一行
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replication需要定义
#GTID支持(5.6及以上版本添加如下三个参数) (单机无需开启gtid)
gtid_mode=on enforce_gtid_consistency=1 log_slave_updates=1
- 赋予数据库用户binlog权限,SQL语句如下:
grant REPLICATION SLAVE ON *.* TO '用户名' ;
grant REPLICATION CLIENT ON *.* TO '用户名' ;
3 JAVA 例子
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MysqlApiCdc {
public static void main(String[] args) {
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage",
"org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "D:/tmp/debeziumdb/offset.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "test");
props.setProperty("database.password", "test");
props.setProperty("database.include.list", "test");
props.setProperty("database.server.id", "1");
props.setProperty("database.serverTimezone", "UTC");//连接低版本MySQL
props.setProperty("database.server.name", "my-app-connector");
props.setProperty("database.history",
"io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename",
"D:/tmp/debeziumdb/dbhistory.dat");
props.setProperty("key.converter.schemas.enable", "false");//去掉key schema
props.setProperty("value.converter.schemas.enable", "false");//去掉值 schema
// Create the engine with this configuration ...
try{
DebeziumEngine<ChangeEvent<String, String>> engine1 = DebeziumEngine
.create(Json.class).using(props).notifying(record -> {
System.out.println(record.value());
}).build() ;
// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine1);
}catch(Exception e){
e.printStackTrace();
}
}
}