a.依赖准备
flink-connector-postgres-cdc-*.jar
 
b.Synchronizing Tables(同步表)
在Flink DataStream作业中使用 PostgresSyncTableAction 或直接通过flink run,可以将PostgreSQL中的一个或多个表同步到一个Paimon表中。
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.7.0-incubating.jar \
    postgres_sync_table
    --warehouse <warehouse_path> \
    --database <database_name> \
    --table <table_name> \
    [--partition_keys <partition_keys>] \
    [--primary_keys <primary_keys>] \
    [--type_mapping <option1,option2...>] \
    [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
    [--metadata_column <metadata_column>] \
    [--postgres_conf <postgres_cdc_source_conf> [--postgres_conf <postgres_cdc_source_conf> ...]] \
    [--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]] \
    [--table_conf <paimon_table_sink_conf> [--table_conf <paimon_table_sink_conf> ...]]
 
配置信息如下:
| Configuration | Description | 
|---|---|
| –warehouse | The path to Paimon warehouse. | 
| –database | The database name in Paimon catalog. | 
| –table | The Paimon table name. | 
| –partition_keys | The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”. | 
| –primary_keys | The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”. | 
| –type_mapping | It is used to specify how to map PostgreSQL data type to Paimon type. Supported options:“to-string”: maps all PostgreSQL types to STRING. | 
| –computed_column | The definitions of computed columns. The argument field is from PostgreSQL table field name. See here for a complete list of configurations. | 
| –metadata_column | –metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata. | 
| –postgres_conf | The configuration for Flink CDC Postgres sources. Each configuration should be specified in the format “key=value”. hostname, username, password, database-name, schema-name, table-name and slot.name are required configurations, others are optional. See its document for a complete list of configurations. | 
| –catalog_conf | The configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations. | 
| –table_conf | The configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations. | 
如果指定的Paimon表不存在,将自动创建该表,表结构将从所有指定的PostgreSQL表中派生出来。
如果Paimon表已经存在,其表结构将与所有指定PostgreSQL表的结构进行比较。
示例1:将表同步到一个Paimon表中
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.7.0-incubating.jar \
    postgres_sync_table \
    --warehouse hdfs:///path/to/warehouse \
    --database test_db \
    --table test_table \
    --partition_keys pt \
    --primary_keys pt,uid \
    --computed_column '_year=year(age)' \
    --postgres_conf hostname=127.0.0.1 \
    --postgres_conf username=root \
    --postgres_conf password=123456 \
    --postgres_conf database-name='source_db' \
    --postgres_conf schema-name='public' \
    --postgres_conf table-name='source_table1|source_table2' \
    --postgres_conf slot.name='paimon_cdc' \
    --catalog_conf metastore=hive \
    --catalog_conf uri=thrift://hive-metastore:9083 \
    --table_conf bucket=4 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=4
 
如示例所示,postgres_conf的表名支持正则表达式,以监控满足正则表达式的多个表。所有表的结构将合并到一个Paimon表结构中。
示例2:将分片的表同步到一个Paimon表中
使用正则表达式设置“schema-name”来捕获多个schemas。
典型场景:表“source_table”被拆分为模式“source_schema1”,“source_schema2”…,然后将所有“source_table”的数据同步到一个Paimon表中。
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.7.0-incubating.jar \
    postgres_sync_table \
    --warehouse hdfs:///path/to/warehouse \
    --database test_db \
    --table test_table \
    --partition_keys pt \
    --primary_keys pt,uid \
    --computed_column '_year=year(age)' \
    --postgres_conf hostname=127.0.0.1 \
    --postgres_conf username=root \
    --postgres_conf password=123456 \
    --postgres_conf database-name='source_db' \
    --postgres_conf schema-name='source_schema.+' \
    --postgres_conf table-name='source_table' \
    --postgres_conf slot.name='paimon_cdc' \
    --catalog_conf metastore=hive \
    --catalog_conf uri=thrift://hive-metastore:9083 \
    --table_conf bucket=4 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=4
 










