Flink CDC 同步Oracle数据
一、启用归档日志
- 用dba进入数据库
sqlplus / AS SYSDBA - 开启归档日志
修改归档日志大小,目录
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = ‘/oradata/dg01/recovery_area’ scope=spfile;
alter system set db_recovery_file_dest_size=41820M scope=spfile;
重启数据库实例,打开归档日志
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
查看归档
archive log list;
- 开启补全日志
开启单个表
ALTER TABLE schema.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
开启全库
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
全体字段补充日志
打开all补全日志(建议执行)
alter database add supplemental log data (all) columns;
查看是否打开
select supplemental_log_data_all as all from v$database ;
删除all补全日志
alter database drop supplemental log data (all) columns;
二、创建Oracle用户并授权
-
创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE ‘/oradata/dg01/logminer_tbs.dbf’ SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; -
创建用户并授权
CREATE USER flink IDENTIFIED BY flink DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flink;
GRANT SET CONTAINER TO flink; //
GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 448: …ANT SELECT ON V_̲LOG TO flink;
GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 40: …ANT SELECT ON V_̲LOGMNR_LOGS TO flink;
GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 44: …ANT SELECT ON V_̲LOGMNR_PARAMETERS TO flink;
GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 36: …ANT SELECT ON V_̲ARCHIVED_LOG TO flink;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flink;
三、代码
- pom.xml
4.0.0
<groupId>org.example</groupId>
<artifactId>flink-cdc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.12.1</flink.version>
<target.java.version>1.8</target.java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- Hadoop-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>spendreport.FraudDetectionJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
- Demo.java
/**
-
@author Endless
-
注意:并行度必须设置为1,否则执行顺序不对。
*/
public class Demo {
public static void main(String[] args) throws Exception {
Properties pros = new Properties();
pros.setProperty(“debezium.log.mining.strategy”, “online_catalog”);
pros.setProperty(“debezium.log.mining.continuous.mine”, “true”);
DebeziumSourceFunction sourceFunction = OracleSource.builder()
.hostname(“ip”)
.port(1521)
.database(“数据库”)
.schemaList(“schema”)
.tableList(“schema.table1, schema.table2”)
.username(“flink”)
.password(“flink”)
.debeziumProperties(pros)
.deserializer(new JsonDebeziumDeserializationSchema())
.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 指定登录hadoop的用户 System.setProperty("HADOOP_USER_NAME", "hadoop"); // 开启检查点 env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 检查点存储位置 env.setStateBackend(new FsStateBackend("hdfs://ip:9000/user/bd/flink/checkpoint/", true)); // 取消作业,checkpoint清除策略 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 数据源 DataStreamSource<String> source = env.addSource(sourceFunction); // SingleOutputStreamOperator<String> archiveLog = source.map((MapFunction<String, String>) json -> { ArchiveLog archiveLog1 = JSON.parseObject(json, ArchiveLog.class); return JSON.toJSONString(archiveLog1); }); // 消息存入数据库TiDB archiveLog.addSink(new SinkToTiDB()); env.execute("flink cdc");
}
private static class SinkToTiDB extends RichSinkFunction {
private transient DruidDataSource dataSource = null;@Override public void open(Configuration parameters) throws Exception { // 数据库连接 dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUsername("username"); dataSource.setPassword("password"); dataSource.setUrl("jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false"); dataSource.setMaxActive(5); } @Override public void invoke(String json, Context context) throws Exception { ArchiveLog archiveLog = JSON.parseObject(json, ArchiveLog.class); String op = archiveLog.getOp(); ArchiveLogSource source = archiveLog.getSource(); String after = archiveLog.getAfter(); JSONObject jsonObject = JSON.parseObject(after); String sql = ""; switch (op) { // insert 新增 case "c": System.out.println("新增逻辑"); StringBuilder keyBuilder = new StringBuilder(); StringBuilder valueBuilder = new StringBuilder(); for (String item : jsonObject.keySet()) { keyBuilder.append(item).append(","); valueBuilder.append("'").append(jsonObject.get(item)).append("'").append(","); } String key = keyBuilder.substring(0, keyBuilder.length() - 1); String value = valueBuilder.substring(0, valueBuilder.length() - 1); sql = "insert into " + source.getSchema() + "." + source.getTable() + "(" + key + ") values(" + value + ")"; break; // update 更新 case "u": System.out.println("更新逻辑"); StringBuilder updateBuilder = new StringBuilder(); StringBuilder idBuilder = new StringBuilder(); for (String item : jsonObject.keySet()) { if (item.equalsIgnoreCase("id")) { idBuilder.append("'").append(jsonObject.get(item)).append("'"); } else { updateBuilder.append(item).append("=").append("'").append(jsonObject.get(item)).append("'").append(","); } } String keyValue = updateBuilder.substring(0, updateBuilder.length() - 1); String id = idBuilder.toString(); System.out.println(keyValue); sql = "update " + source.getSchema() + "." + source.getTable() + " set " + keyValue + " where id =" + id; break; // delete 删除 case "d": String before = archiveLog.getBefore(); JSONObject deleteObj = JSON.parseObject(before); id = deleteObj.get("ID").toString(); System.out.println("删除逻辑"); sql = "delete from " + source.getSchema() + "." + source.getTable() + " where id = '" + id + "'"; break; case "r": System.out.println("读取逻辑"); break; } Connection conn = null; PreparedStatement ps = null; try { conn = dataSource.getConnection(); ps = conn.prepareStatement(sql); ps.execute(); } catch (Exception e) { e.printStackTrace(); } finally { if (ps != null) { try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { conn.close(); } } }
}
}
- ArchiveLog.java
/**
- @author Endless
*/
@Data
@ToString
public class ArchiveLog {
private String before;
private String after;
private ArchiveLogSource source;
private String op;
private String ts_ms;
private String transaction;
}
- ArchiveLogSource.java
/**
- @author Endless
*/
@Data
@ToString
public class ArchiveLogSource {
private String version;
private String connector;
private String name;
private String ts_ms;
private String snapshot;
private String db;
private String sequence;
private String schema;
private String table;
private String txId;
private String scn;
private String commit_scn;
private String lcr_position;
}
四、补充
- 归档日志
新增
{
“before”: null,
“after”: {
“ID”: “1”,
“NAME”: “1”
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646652622448,
“snapshot”: “last”,
“db”: “DG01”,
“sequence”: null,
“schema”: “test”,
“table”: “CDCTEST”,
“txId”: null,
“scn”: “46495548600”,
“commit_scn”: null,
“lcr_position”: null
},
“op”: “c”,
“ts_ms”: 1646652622456,
“transaction”: null
}
更新
{
“before”: {
“ID”: “1”,
“NAME”: “1”
},
“after”: {
“ID”: “1”,
“NAME”: “2”
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646680890000,
“snapshot”: “false”,
“db”: “DG01”,
“sequence”: null,
“schema”: “test”,
“table”: “CDCTEST”,
“txId”: “0a0009007f231200”,
“scn”: “46495572789”,
“commit_scn”: “46495590649”,
“lcr_position”: null
},
“op”: “u”,
“ts_ms”: 1646652829683,
“transaction”: null
}
删除
{
“before”: {
“ID”: “1”,
“NAME”: “2”
},
“after”: null,
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646819782000,
“snapshot”: “false”,
“db”: “DG01”,
“sequence”: null,
“schema”: “FLINK”,
“table”: “CDC2”,
“txId”: “0a00140054270000”,
“scn”: “2491112”,
“commit_scn”: “2491120”,
“lcr_position”: null
},
“op”: “d”,
“ts_ms”: 1646791645954,
“transaction”: null
}
读取
{
“before”: null,
“after”: {
“ID”: “1”,
“NAME”: “1”
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646652622448,
“snapshot”: “last”,
“db”: “DG01”,
“sequence”: null,
“schema”: “test”,
“table”: “CDCTEST”,
“txId”: null,
“scn”: “46495548600”,
“commit_scn”: null,
“lcr_position”: null
},
“op”: “r”,
“ts_ms”: 1646652622456,
“transaction”: null
}