0
点赞
收藏
分享

微信扫一扫

QT的一些小操作

君之言之 03-16 09:30 阅读 5

a.依赖准备

flink-connector-postgres-cdc-*.jar

b.Synchronizing Tables(同步表)

在Flink DataStream作业中使用 PostgresSyncTableAction 或直接通过flink run,可以将PostgreSQL中的一个或多个表同步到一个Paimon表中。

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.7.0-incubating.jar \
    postgres_sync_table
    --warehouse <warehouse_path> \
    --database <database_name> \
    --table <table_name> \
    [--partition_keys <partition_keys>] \
    [--primary_keys <primary_keys>] \
    [--type_mapping <option1,option2...>] \
    [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
    [--metadata_column <metadata_column>] \
    [--postgres_conf <postgres_cdc_source_conf> [--postgres_conf <postgres_cdc_source_conf> ...]] \
    [--catalog_conf <paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]] \
    [--table_conf <paimon_table_sink_conf> [--table_conf <paimon_table_sink_conf> ...]]

配置信息如下

ConfigurationDescription
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–tableThe Paimon table name.
–partition_keysThe partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
–primary_keysThe primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”.
–type_mappingIt is used to specify how to map PostgreSQL data type to Paimon type. Supported options:“to-string”: maps all PostgreSQL types to STRING.
–computed_columnThe definitions of computed columns. The argument field is from PostgreSQL table field name. See here for a complete list of configurations.
–metadata_column–metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata.
–postgres_confThe configuration for Flink CDC Postgres sources. Each configuration should be specified in the format “key=value”. hostname, username, password, database-name, schema-name, table-name and slot.name are required configurations, others are optional. See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

如果指定的Paimon表不存在,将自动创建该表,表结构将从所有指定的PostgreSQL表中派生出来。

如果Paimon表已经存在,其表结构将与所有指定PostgreSQL表的结构进行比较。

示例1:将表同步到一个Paimon表中

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.7.0-incubating.jar \
    postgres_sync_table \
    --warehouse hdfs:///path/to/warehouse \
    --database test_db \
    --table test_table \
    --partition_keys pt \
    --primary_keys pt,uid \
    --computed_column '_year=year(age)' \
    --postgres_conf hostname=127.0.0.1 \
    --postgres_conf username=root \
    --postgres_conf password=123456 \
    --postgres_conf database-name='source_db' \
    --postgres_conf schema-name='public' \
    --postgres_conf table-name='source_table1|source_table2' \
    --postgres_conf slot.name='paimon_cdc' \
    --catalog_conf metastore=hive \
    --catalog_conf uri=thrift://hive-metastore:9083 \
    --table_conf bucket=4 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=4

如示例所示,postgres_conf的表名支持正则表达式,以监控满足正则表达式的多个表。所有表的结构将合并到一个Paimon表结构中。

示例2:将分片的表同步到一个Paimon表中

使用正则表达式设置“schema-name”来捕获多个schemas。

典型场景:表“source_table”被拆分为模式“source_schema1”,“source_schema2”…,然后将所有“source_table”的数据同步到一个Paimon表中。

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.7.0-incubating.jar \
    postgres_sync_table \
    --warehouse hdfs:///path/to/warehouse \
    --database test_db \
    --table test_table \
    --partition_keys pt \
    --primary_keys pt,uid \
    --computed_column '_year=year(age)' \
    --postgres_conf hostname=127.0.0.1 \
    --postgres_conf username=root \
    --postgres_conf password=123456 \
    --postgres_conf database-name='source_db' \
    --postgres_conf schema-name='source_schema.+' \
    --postgres_conf table-name='source_table' \
    --postgres_conf slot.name='paimon_cdc' \
    --catalog_conf metastore=hive \
    --catalog_conf uri=thrift://hive-metastore:9083 \
    --table_conf bucket=4 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=4
举报

相关推荐

0 条评论