0
点赞
收藏
分享

微信扫一扫

在阿里云用Flink Sql同步polardb数据到hbase

E_topia 2021-09-19 阅读 106
日记本

技术准备:
开通dts
开通datahub
开通kafka队列(0.10版),按时付费每小时2.35元
开通hbase serverless版,每小时0.01元
开通flink1.11全托管版

数据流程:
polardb->dts->datahub->flink->kafka->flink->hbase

为什么还需要datahub传输到kafka,而不直接通过dts到kafka,因为dts同步的时候多张表只能选择到一个topic,而datahub可以同步到多个topic。

注意:关于dts同步数据到datahub的说明如下图:


将polardb的表rb_test2同步(当然也可以选择多张表),其表结构如下:

CREATE TABLE `rb_test2` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  `password` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=54 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

在Flink中关联datahub中的topic

create table dh_in_testk2 (
    id INT,
    name VARCHAR,
    password VARCHAR,
    new_dts_sync_dts_record_id VARCHAR,
    new_dts_sync_dts_operation_flag VARCHAR,
    new_dts_sync_dts_before_flag VARCHAR,
    new_dts_sync_dts_after_flag VARCHAR
) WITH (
    'connector' = 'datahub',
    'endPoint' = 'http://dh-cn-shenzhen-int-vpc.aliyuncs.com',
    'project' = '*****',
    'topic' = 'rb_test2',
    -- topic是datahub中的
    'accessId' = '*****',
    'accessKey' = '*****',
    'subId' = '******'
    --subId是在datahub服务中添加的订阅id
);

在Flink中关联kafka中的topic

create table kk_out_test2 (
    id INT,
    name VARCHAR,
    password VARCHAR,
    new_dts_sync_dts_record_id VARCHAR,
    new_dts_sync_dts_operation_flag VARCHAR,
    new_dts_sync_dts_before_flag VARCHAR,
    new_dts_sync_dts_after_flag VARCHAR
) with (
    'connector' = 'kafka',
    'topic' = 'rb_test2',
    'properties.bootstrap.servers' = '***:9092,***9092,***:9092',
    'format' = 'json'
) 

运行Flink作业,将datahub topic rb_test2数据实时写入kafka,如果是更新操作,只同步更新后的数据。

insert into
    kk_out_test2
select
    *
from
    dh_in_testk2
where
    new_dts_sync_dts_operation_flag <> 'U'
    or new_dts_sync_dts_after_flag = 'Y'

在hbase中添加表

create 'test2','cf'

在Flink中关联hbase

CREATE TABLE hbase_test2 (
    rowkey STRING,
    cf ROW < id INT,
    name STRING,
    password STRING,
    new_dts_sync_dts_operation_flag STRING>
) with (
    'connector' = 'cloudhbase',
    'table-name' = 'test2',
    'zookeeper.quorum' = 'https://sh-***-hbase-serverless-in.hbase.rds.aliyuncs.com:443',
    'userName'='***',
    'password'='***'
);

运行Flink作业同步到hbase

insert into
    hbase_test2
select
    CONCAT(id,''),
    ROW (id, name, password,new_dts_sync_dts_operation_flag)
from
    kk_out_test2;

从polardb到hbase,时间大概会延迟2-3秒,如果中间转化过程比较多,那么这个时间会相应的增加。在hbase中获取值的时候,需要判断new_dts_sync_dts_operation_flag 为D的时候,这个值是被删除的。

举报

相关推荐

0 条评论