0
点赞
收藏
分享

微信扫一扫

Debezium 嵌入式运行

冶炼厂小练 2022-06-13 阅读 170

      Debezium 通过解析数据库日志获取DDL和DML的操作数据, Debezium connectors可以运行在 Kafka Connect 服务中,同时也支持嵌入应用程序中运行。本文运行一个嵌入式例子。

1 开发环境  eclipse gradle

   build.gradle引入jar包

   dependencies {

​    compile 'org.slf4j:slf4j-api:1.7.21' ​

​    compile 'org.apache.logging.log4j:log4j-api:2.8.2' ​

​    compile 'org.apache.logging.log4j:log4j-core:2.8.2' ​

​    compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.8.2' ​

​    compile 'io.debezium:debezium-api:1.9.3.Final' ​

​    compile 'io.debezium:debezium-embedded:1.9.3.Final' ​

​    compile 'io.debezium:debezium-connector-mysql:1.9.3.Final' ​

​}​

说明:引用log4j包报错时会提示,如果不引入报错程序结束不知道原因。

2 监听mysql数据库日志

  修改Mysql配置文件 (MySql 5.x 以上版本)

  1. 增量捕获功能需要修改 MySQL配置文件 linux为my.cnf或windows为my.ini,添加内容如下,然后重启数据库

[mysqld] log-bin=mysql-bin #添加这一行

binlog-format=ROW #选择row模式

server_id=1 #配置mysql replication需要定义

#GTID支持(5.6及以上版本添加如下三个参数) (单机无需开启gtid)

gtid_mode=on enforce_gtid_consistency=1 log_slave_updates=1

  1. 赋予数据库用户binlog权限,SQL语句如下:

grant REPLICATION SLAVE ON *.* TO '用户名' ;

grant REPLICATION CLIENT ON *.* TO '用户名' ;

3 JAVA 例子

​import io.debezium.engine.ChangeEvent; ​

​import io.debezium.engine.DebeziumEngine; ​

​import io.debezium.engine.format.Json; ​

​import java.util.Properties; ​

​import java.util.concurrent.ExecutorService; ​

​import java.util.concurrent.Executors; ​

​public class MysqlApiCdc { ​

​ public static void main(String[] args) { ​

​  final Properties props = new Properties(); ​

​  props.setProperty("name", "engine"); ​

​  props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); ​

​  props.setProperty("offset.storage", ​

​    "org.apache.kafka.connect.storage.FileOffsetBackingStore"); ​

​  props.setProperty("offset.storage.file.filename", "D:/tmp/debeziumdb/offset.dat"); ​

​  props.setProperty("offset.flush.interval.ms", "60000"); ​

​  /* begin connector properties */ ​

​  props.setProperty("database.hostname", "localhost"); ​

​  props.setProperty("database.port", "3306"); ​

​  props.setProperty("database.user", "test"); ​

​  props.setProperty("database.password", "test"); ​

​  props.setProperty("database.include.list", "test"); ​

​  props.setProperty("database.server.id", "1"); ​

​  props.setProperty("database.serverTimezone", "UTC");//连接低版本MySQL ​

​  props.setProperty("database.server.name", "my-app-connector"); ​

​  props.setProperty("database.history", ​

​    "io.debezium.relational.history.FileDatabaseHistory"); ​

​  props.setProperty("database.history.file.filename", ​

​    "D:/tmp/debeziumdb/dbhistory.dat"); ​

​  props.setProperty("key.converter.schemas.enable", "false");//去掉key schema ​

​  props.setProperty("value.converter.schemas.enable", "false");//去掉值 schema ​

​  // Create the engine with this configuration ... ​

​  try{ ​

​   DebeziumEngine<ChangeEvent<String, String>> engine1 = DebeziumEngine ​

​     .create(Json.class).using(props).notifying(record -> { ​

​      System.out.println(record.value()); ​

​     }).build() ; ​

​    // Run the engine asynchronously ... ​

​    ExecutorService executor = Executors.newSingleThreadExecutor(); ​

​    executor.execute(engine1); ​

​  }catch(Exception e){ ​

​   e.printStackTrace(); ​

​  } ​

​ } ​

​}​


举报

相关推荐

0 条评论