〇、概述
1、常见资料
(1)confluent
https://docs.confluent.io/5.4.0/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html
一、可用的sink/source配置
(一)source connector
1、根据自增id的source
(1)订单表
{
"name": "source_connect_Oracle_Test_T_Order_0905",
"config": {
"connector.class": "com.ecer.kafka.connect.oracle.OracleSourceConnector",
"reset.offset": "true",
"incrementing.column.name": "AUUID_0",
"db.hostname": "192.168.0.100",
"tasks.max": "1",
"db.user.password": "system",
"table.blacklist": "",
"table.whitelist": "TEST.T_Order",
"mode": "incrementing",
"db.user": "system",
"db.port": "1521",
"db.fetch.size": "1",
"multitenant": "false",
"name": "source_connect_Oracle_Test_T_Order_0905",
"topic": "connect_topic_Test_T_Order_0905",
"parse.dml.data": "true",
"db.name": "helowin",
"db.name.alias": "helowin"
},
"tasks": [
{
"connector": "source_connect_Oracle_Test_T_Order_0905",
"task": 0
}
],
"type": "source"
}
(2)用户表
1、Oracle的source
{
"name": "ORACLE_TEST_ID_TIMESTAMP_INC",
"config": {
"connector.class": "com.ecer.kafka.connect.oracle.OracleSourceConnector",
"reset.offset": "true",
"incrementing.column.name": "ID",
"db.hostname": "192.168.0.100",
"tasks.max": "1",
"db.user.password": "system",
"table.blacklist": "",
"table.whitelist": "TEST.T_USER",
"mode": "incrementing",
"db.user": "system",
"db.port": "1521",
"db.fetch.size": "1",
"multitenant": "false",
"name": "ORACLE_TEST_ID_TIMESTAMP_INC",
"topic": "ORACLE_TEST_TOPIC",
"parse.dml.data": "true",
"db.name": "helowin",
"db.name.alias": "helowin"
},
"tasks": [
{
"connector": "ORACLE_TEST_ID_TIMESTAMP_INC",
"task": 0
}
],
"type": "source"
}
2、根据time自增的source
{
"name": "source_connect_Oracle_Test_T_Order_timestamp_0905",
"config": {
"connector.class": "com.ecer.kafka.connect.oracle.OracleSourceConnector",
"reset.offset": "true",
"timestamp.column.name": "UPDDATTIM_0",
"db.hostname": "192.168.0.100",
"tasks.max": "1",
"db.user.password": "system",
"table.blacklist": "",
"table.whitelist": "TEST.T_Order",
"mode": "timestamp incrementing",
"db.user": "system",
"db.port": "1521",
"db.fetch.size": "1",
"multitenant": "false",
"name": "source_connect_Oracle_Test_T_Order_timestamp_0905",
"topic": "topic_Test_T_Order_timestamp_0905",
"parse.dml.data": "true",
"db.name": "helowin",
"db.name.alias": "helowin"
},
"tasks": [
{
"connector": "source_connect_Oracle_Test_T_Order_timestamp_0905",
"task": 0
}
],
"type": "source"
}
3、基于confluent的JDBC source
{
"name": "JDBC_ORACLE_SOURCE1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "ID",
"topic.prefix": "JDBC_ORACLE_SOURCE_",
"connection.password": "system",
"connection.user": "system",
"name": "JDBC_ORACLE_SOURCE1",
"connection.url": "jdbc:oracle:thin:@//192.168.0.100:1521/helowin",
"table.whitelist": "TEST.T_USER"
},
"tasks": [
{
"connector": "JDBC_ORACLE_SOURCE1",
"task": 0
}
],
"type": "source"
}
(二)sink
{
"name": "POSTGRESQL_SINK_TEST",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"table.name.format": "wxf_test.T_USER",
"connection.password": "qaz123",
"tasks.max": "1",
"topics": "JDBC_ORACLE_SOURCE_T_USER",
"delete.enabled": "false",
"auto.evolve": "true",
"connection.user": "postgres",
"name": "POSTGRESQL_SINK_TEST",
"auto.create": "true",
"connection.url": "jdbc:postgresql://170.0.0.1:5432/bigdata",
"insert.mode": "upsert",
"pk.mode": "record_value"
},
"tasks": [
{
"connector": "POSTGRESQL_SINK_TEST",
"task": 0
}
],
"type": "sink"
}
作者:哥们要飞