0
点赞
收藏
分享

微信扫一扫

Flink CDC 同步Oracle数据

香小蕉 2022-03-12 阅读 144

Flink CDC 同步Oracle数据
一、启用归档日志

  1. 用dba进入数据库
    sqlplus / AS SYSDBA
  2. 开启归档日志

修改归档日志大小,目录

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = ‘/oradata/dg01/recovery_area’ scope=spfile;
alter system set db_recovery_file_dest_size=41820M scope=spfile;

重启数据库实例,打开归档日志

shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

查看归档

archive log list;

  1. 开启补全日志

开启单个表

ALTER TABLE schema.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

开启全库

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

全体字段补充日志

打开all补全日志(建议执行)

alter database add supplemental log data (all) columns;

查看是否打开

select supplemental_log_data_all as all from v$database ;

删除all补全日志

alter database drop supplemental log data (all) columns;

二、创建Oracle用户并授权

  1. 创建表空间
    CREATE TABLESPACE logminer_tbs DATAFILE ‘/oradata/dg01/logminer_tbs.dbf’ SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

  2. 创建用户并授权
    CREATE USER flink IDENTIFIED BY flink DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
    GRANT CREATE SESSION TO flink;
    GRANT SET CONTAINER TO flink; //
    GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 448: …ANT SELECT ON V_̲LOG TO flink;
    GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 40: …ANT SELECT ON V_̲LOGMNR_LOGS TO flink;
    GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 44: …ANT SELECT ON V_̲LOGMNR_PARAMETERS TO flink;
    GRANT SELECT ON V_KaTeX parse error: Expected group after '_' at position 36: …ANT SELECT ON V_̲ARCHIVED_LOG TO flink;
    GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flink;

三、代码

  1. pom.xml
<?xml version="1.0" encoding="UTF-8"?>


4.0.0

<groupId>org.example</groupId>
<artifactId>flink-cdc</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <scala.binary.version>2.12</scala.binary.version>
    <flink.version>1.12.1</flink.version>
    <target.java.version>1.8</target.java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-oracle-cdc</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.22</version>
        <!--            <scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.20</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
        <!--            <scope>provided</scope>-->
    </dependency>

    <!-- Hadoop-->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.9.2</version>
    </dependency>
</dependencies>

<build>
    <plugins>

        <!-- Java Compiler -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>${target.java.version}</source>
                <target>${target.java.version}</target>
            </configuration>
        </plugin>

        <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
        <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <!-- Run shade goal on package phase -->
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>org.apache.logging.log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>spendreport.FraudDetectionJob</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>

    <pluginManagement>
        <plugins>

            <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
            <plugin>
                <groupId>org.eclipse.m2e</groupId>
                <artifactId>lifecycle-mapping</artifactId>
                <version>1.0.0</version>
                <configuration>
                    <lifecycleMappingMetadata>
                        <pluginExecutions>
                            <pluginExecution>
                                <pluginExecutionFilter>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-shade-plugin</artifactId>
                                    <versionRange>[3.0.0,)</versionRange>
                                    <goals>
                                        <goal>shade</goal>
                                    </goals>
                                </pluginExecutionFilter>
                                <action>
                                    <ignore/>
                                </action>
                            </pluginExecution>
                            <pluginExecution>
                                <pluginExecutionFilter>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <versionRange>[3.1,)</versionRange>
                                    <goals>
                                        <goal>testCompile</goal>
                                        <goal>compile</goal>
                                    </goals>
                                </pluginExecutionFilter>
                                <action>
                                    <ignore/>
                                </action>
                            </pluginExecution>
                        </pluginExecutions>
                    </lifecycleMappingMetadata>
                </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
  1. Demo.java
    /**
  • @author Endless

  • 注意:并行度必须设置为1,否则执行顺序不对。
    */
    public class Demo {
    public static void main(String[] args) throws Exception {
    Properties pros = new Properties();
    pros.setProperty(“debezium.log.mining.strategy”, “online_catalog”);
    pros.setProperty(“debezium.log.mining.continuous.mine”, “true”);
    DebeziumSourceFunction sourceFunction = OracleSource.builder()
    .hostname(“ip”)
    .port(1521)
    .database(“数据库”)
    .schemaList(“schema”)
    .tableList(“schema.table1, schema.table2”)
    .username(“flink”)
    .password(“flink”)
    .debeziumProperties(pros)
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.setParallelism(1);
     // 指定登录hadoop的用户
     System.setProperty("HADOOP_USER_NAME", "hadoop");
     // 开启检查点
     env.enableCheckpointing(1000);
     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
     // 检查点存储位置
     env.setStateBackend(new FsStateBackend("hdfs://ip:9000/user/bd/flink/checkpoint/", true));
     // 取消作业,checkpoint清除策略
     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
     // 数据源
     DataStreamSource<String> source = env.addSource(sourceFunction);
     //
     SingleOutputStreamOperator<String> archiveLog = source.map((MapFunction<String, String>) json -> {
         ArchiveLog archiveLog1 = JSON.parseObject(json, ArchiveLog.class);
         return JSON.toJSONString(archiveLog1);
     });
    
     // 消息存入数据库TiDB
     archiveLog.addSink(new SinkToTiDB());
    
     env.execute("flink cdc");
    

    }

    private static class SinkToTiDB extends RichSinkFunction {
    private transient DruidDataSource dataSource = null;

     @Override
     public void open(Configuration parameters) throws Exception {
         // 数据库连接
         dataSource = new DruidDataSource();
         dataSource.setDriverClassName("com.mysql.jdbc.Driver");
         dataSource.setUsername("username");
         dataSource.setPassword("password");
         dataSource.setUrl("jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false");
         dataSource.setMaxActive(5);
     }
    
     @Override
     public void invoke(String json, Context context) throws Exception {
         ArchiveLog archiveLog = JSON.parseObject(json, ArchiveLog.class);
    
         String op = archiveLog.getOp();
         ArchiveLogSource source = archiveLog.getSource();
         String after = archiveLog.getAfter();
         JSONObject jsonObject = JSON.parseObject(after);
         String sql = "";
         switch (op) {
             // insert 新增
             case "c":
                 System.out.println("新增逻辑");
                 StringBuilder keyBuilder = new StringBuilder();
                 StringBuilder valueBuilder = new StringBuilder();
                 for (String item : jsonObject.keySet()) {
                     keyBuilder.append(item).append(",");
                     valueBuilder.append("'").append(jsonObject.get(item)).append("'").append(",");
                 }
                 String key = keyBuilder.substring(0, keyBuilder.length() - 1);
                 String value = valueBuilder.substring(0, valueBuilder.length() - 1);
                 sql = "insert into " + source.getSchema() + "." + source.getTable() + "(" + key + ") values(" + value + ")";
                 break;
             // update 更新
             case "u":
                 System.out.println("更新逻辑");
                 StringBuilder updateBuilder = new StringBuilder();
                 StringBuilder idBuilder = new StringBuilder();
                 for (String item : jsonObject.keySet()) {
                     if (item.equalsIgnoreCase("id")) {
                         idBuilder.append("'").append(jsonObject.get(item)).append("'");
                     } else {
                         updateBuilder.append(item).append("=").append("'").append(jsonObject.get(item)).append("'").append(",");
                     }
                 }
                 String keyValue = updateBuilder.substring(0, updateBuilder.length() - 1);
                 String id = idBuilder.toString();
                 System.out.println(keyValue);
                 sql = "update " + source.getSchema() + "." + source.getTable() + " set " + keyValue + " where id =" + id;
                 break;
             // delete 删除
             case "d":
                 String before = archiveLog.getBefore();
                 JSONObject deleteObj = JSON.parseObject(before);
                 id = deleteObj.get("ID").toString();
                 System.out.println("删除逻辑");
                 sql = "delete from " + source.getSchema() + "." + source.getTable() + " where id = '" + id + "'";
                 break;
             case "r":
                 System.out.println("读取逻辑");
                 break;
         }
         Connection conn = null;
         PreparedStatement ps = null;
         try {
             conn = dataSource.getConnection();
             ps = conn.prepareStatement(sql);
             ps.execute();
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             if (ps != null) {
                 try {
                     ps.close();
                 } catch (SQLException e) {
                     e.printStackTrace();
                 }
             }
             if (conn != null) {
                 conn.close();
             }
         }
    
     }
    

    }
    }

  1. ArchiveLog.java

/**

  • @author Endless
    */
    @Data
    @ToString
    public class ArchiveLog {
    private String before;
    private String after;
    private ArchiveLogSource source;
    private String op;
    private String ts_ms;
    private String transaction;
    }
  1. ArchiveLogSource.java

/**

  • @author Endless
    */
    @Data
    @ToString
    public class ArchiveLogSource {
    private String version;
    private String connector;
    private String name;
    private String ts_ms;
    private String snapshot;
    private String db;
    private String sequence;
    private String schema;
    private String table;
    private String txId;
    private String scn;
    private String commit_scn;
    private String lcr_position;
    }

四、补充

  1. 归档日志

新增

{
“before”: null,
“after”: {
“ID”: “1”,
“NAME”: “1”
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646652622448,
“snapshot”: “last”,
“db”: “DG01”,
“sequence”: null,
“schema”: “test”,
“table”: “CDCTEST”,
“txId”: null,
“scn”: “46495548600”,
“commit_scn”: null,
“lcr_position”: null
},
“op”: “c”,
“ts_ms”: 1646652622456,
“transaction”: null
}

更新

{
“before”: {
“ID”: “1”,
“NAME”: “1”
},
“after”: {
“ID”: “1”,
“NAME”: “2”
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646680890000,
“snapshot”: “false”,
“db”: “DG01”,
“sequence”: null,
“schema”: “test”,
“table”: “CDCTEST”,
“txId”: “0a0009007f231200”,
“scn”: “46495572789”,
“commit_scn”: “46495590649”,
“lcr_position”: null
},
“op”: “u”,
“ts_ms”: 1646652829683,
“transaction”: null
}

删除

{
“before”: {
“ID”: “1”,
“NAME”: “2”
},
“after”: null,
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646819782000,
“snapshot”: “false”,
“db”: “DG01”,
“sequence”: null,
“schema”: “FLINK”,
“table”: “CDC2”,
“txId”: “0a00140054270000”,
“scn”: “2491112”,
“commit_scn”: “2491120”,
“lcr_position”: null
},
“op”: “d”,
“ts_ms”: 1646791645954,
“transaction”: null
}

读取

{
“before”: null,
“after”: {
“ID”: “1”,
“NAME”: “1”
},
“source”: {
“version”: “1.5.4.Final”,
“connector”: “oracle”,
“name”: “oracle_logminer”,
“ts_ms”: 1646652622448,
“snapshot”: “last”,
“db”: “DG01”,
“sequence”: null,
“schema”: “test”,
“table”: “CDCTEST”,
“txId”: null,
“scn”: “46495548600”,
“commit_scn”: null,
“lcr_position”: null
},
“op”: “r”,
“ts_ms”: 1646652622456,
“transaction”: null
}

举报

相关推荐

0 条评论