0
点赞
收藏
分享

微信扫一扫

HTML和HTML5有什么区别

color_小浣熊 2024-09-03 阅读 11

文章目录

Flink CDC MySQL数据同步到Doris表同步配置生成工具类

工具类

在这里插入图片描述

生成的配置

要同步表为:
customer_user.tb_business_user_info
express.route_push_service
请提前自行到doris中建好目标数据库,如果没有会报错
同步的配置文件如下:(将配置内容保存为xxx.yaml文件到flink cdc提交任务)


pipeline:
  name: Sync MySQL Tables To Doris
  parallelism: 1
source:
  type: mysql
  hostname: 10.1.0.24
  port: 3306
  username: root
  password: xxxxxxxxx
  tables: customer_user.tb_business_user_info,express.route_push_service
  server-id: your MYSQL serverId
  server-time-zone: UTC+08
sink:
  type: doris
  fenodes: 10.1.0.27:8030,10.1.0.50:8030,10.1.0.244:8030
  benodes: 10.1.0.27:8040,10.1.0.50:8040,10.1.0.244:8040
  username: root
  password: "xxxxxxxxxxx"
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 3
route:
  - source-table: customer_user.tb_business_user_info
    sink-table: test.ods_customer_user_tb_business_user_info
    description: sync customer_user.tb_business_user_info to test.ods_customer_user_tb_business_user_info
  - source-table: express.route_push_service
    sink-table: test.ods_express_route_push_service
    description: sync express.route_push_service to test.ods_express_route_push_service

提交任务

任务提交后在flink中可以看到,它会保持全量+增量订阅进行同步,无目标表会自动建表,未同步全量数据会先同步全量。

在这里插入图片描述

工具类java代码


import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.Builder;
import lombok.Data;

import java.util.*;
import java.util.stream.Collectors;

/**
 * @author humorchen
 * date: 2024/8/29
 * description: mysql 同步到doris配置生成器
 **/
public class MysqlSyncToDorisConfigUtil {
    public  interface GetConfig{
        String getConfig();
    }
    public interface CheckConfig{
        void checkConfig();
    }

    public interface InitConfig{
        void initConfig();
    }
    @Data
    @Builder
    public static class MysqlSource implements GetConfig,CheckConfig,InitConfig{
        private String host;
        private int port;
        private String username;
        private String password;
        private String serverId;
        private String timezone;
        private List<TableName> tableNames;
        private static final String TEMPLATE = "source:\n" +
                "  type: mysql\n" +
                "  hostname: #{host}\n" +
                "  port: #{port}\n" +
                "  username: #{username}\n" +
                "  password: #{password}\n" +
                "  tables: #{tables}\n" +
                "  server-id: #{serverId}\n" +
                "  server-time-zone: #{timezone}\n";


        @Override
        public void initConfig() {
            if (port< 1){
                port = 3306;
            }
            if (StrUtil.isBlank(timezone)){
                timezone = "UTC+08";
            }
            if (CollectionUtil.isNotEmpty(tableNames)){
                List<String> collect = tableNames.stream().map(TableName::toString).distinct().collect(Collectors.toList());
                CollectionUtil.sortByPinyin(collect);
                this.tableNames = collect.stream().map(s -> {
                    String[] split = s.split("\\.");
                    return TableName.builder().database(split[0]).tableName(split[1]).build();
                }).collect(Collectors.toList());
            }
        }

        @Override
        public void checkConfig() {
            Assert.notBlank(host);
            Assert.isTrue(port>100);
            Assert.notBlank(username);
            Assert.notBlank(password);
            Assert.notBlank(serverId);
            Assert.notBlank(timezone);
            Assert.isTrue(tableNames != null && !tableNames.isEmpty());
        }

        @Override
        public String getConfig() {
            StringBuilder builder = new StringBuilder();
            if (tableNames != null){
                int i = 0;
                for (TableName tableName : tableNames) {
                    if (i > 0){
                        builder.append(",");
                    }
                    i++;
                    builder.append(tableName.toString());
                }
            }
            String tableNames = builder.toString();
            return TEMPLATE.replace("#{host}",host).replace("#{port}",String.valueOf(port)).replace("#{username}",username).replace("#{password}",password).replace("#{tables}",tableNames).replace("#{serverId}",serverId).replace("#{timezone}",timezone);
        }
    }
    @Data
    @Builder
    public static class TableName{
        private String database;
        private String tableName;

        @Override
        public boolean equals(Object o){
            if (! (o instanceof TableName)){
                return false;
            }
            TableName target = (TableName) o;
            if (!Objects.equals(database,target.database)){
                return false;
            }
            if (!Objects.equals(tableName,target.tableName)){
                return false;
            }
            return true;
        }

        @Override
        public int hashCode(){
            int hash = 0;
            if (database != null){
                hash += database.hashCode();
            }
            if (tableName != null){
                hash += tableName.hashCode();
            }
            return hash;
        }

        @Override
        public String toString(){
            return (database+"."+tableName).replace(" ","");
        }

    }
    @Data
    @Builder
    public static class DorisSource implements GetConfig,CheckConfig{
        private List<DorisFeNode> feNodeList ;
        private List<DorisBeNode> beNodeList;
        private String username;
        private String password;
        private String database;
        private static final String TEMPLATE = "sink:\n" +
                "  type: doris\n" +
                "  fenodes: #{feNodeList}\n" +
                "  benodes: #{beNodeList}\n" +
                "  username: #{username}\n" +
                "  password: \"#{password}\"\n" +
                "  table.create.properties.light_schema_change: true\n" +
                "  table.create.properties.replication_num: 3";

        @Override
        public void checkConfig() {
            Assert.isTrue(CollectionUtil.isNotEmpty(feNodeList));
            Assert.isTrue(CollectionUtil.isNotEmpty(beNodeList));
            Assert.notBlank(username);
            Assert.notBlank(password);
            Assert.notBlank(database);
        }

        @Override
        public String getConfig() {
            StringBuilder feNodeBuilder = new StringBuilder();
            if (feNodeList != null){
                for (int i = 0; i < feNodeList.size(); i++) {
                    if (i >0){
                        feNodeBuilder.append(",");
                    }
                    feNodeBuilder.append(feNodeList.get(i).toString());
                }
            }
            StringBuilder beNodeBuilder = new StringBuilder();
            if (beNodeList != null){
                for (int i = 0; i < beNodeList.size(); i++) {
                    if (i >0){
                        beNodeBuilder.append(",");
                    }
                    beNodeBuilder.append(beNodeList.get(i).toString());
                }
            }
            return TEMPLATE.replace("#{feNodeList}",feNodeBuilder.toString()).replace("#{beNodeList}",beNodeBuilder.toString()).replace("#{username}",username).replace("#{password}",password);
        }
    }

    @Data
    @Builder
    public static class DorisFeNode{
        private String host;
        private int port;

        @Override
        public String toString(){
            return host+":"+port;
        }
    }

    @Data
    @Builder
    public static class DorisBeNode{
        private String host;
        private int port;
        @Override
        public String toString(){
            return host+":"+port;
        }
    }
    @Data
    @Builder
    public static class PipeLineConfig implements GetConfig,CheckConfig,InitConfig{
        private String name;
        private int parallelism;

        private static final String TEMPLATE = "\n" +
                "pipeline:\n" +
                "  name: #{name}\n" +
                "  parallelism: #{parallelism}\n";

        @Override
        public void checkConfig() {
            Assert.notBlank(name);
            Assert.isTrue(parallelism > 0);
        }

        @Override
        public String getConfig() {
            return TEMPLATE.replace("#{name}",name).replace("#{parallelism}",String.valueOf(parallelism));
        }

        @Override
        public void initConfig() {
            if (name == null){
                name = "Sync MySQL Tables To Doris";
            }
            if (parallelism <1){
                parallelism = 1;
            }
        }
    }
    @Data
    @Builder
    public static class RouteConfig implements GetConfig,CheckConfig,InitConfig{
        private MysqlSource mysqlSource;
        private DorisSource dorisSource;
        private String tablePrefix;
        private static final String CONFIG_PREFIX = "\nroute:\n";
        private static final String TEMPLATE = "  - source-table: #{sourceDatabase}.#{sourceTable}\n" +
                "    sink-table: #{sinkDatabase}.#{tablePrefix}#{sourceDatabase}_#{sourceTable}\n" +
                "    description: sync #{sourceDatabase}.#{sourceTable} to #{sinkDatabase}.#{tablePrefix}#{sourceDatabase}_#{sourceTable}\n";

        @Override
        public void initConfig() {
            if (tablePrefix == null){
                tablePrefix = "ods_";
            }
        }

        @Override
        public void checkConfig() {
            Assert.notNull(tablePrefix);
        }

        @Override
        public String getConfig() {
            StringBuilder builder = new StringBuilder();
            builder.append(CONFIG_PREFIX);
            List<TableName> tableNames = mysqlSource.tableNames;
            for (TableName tableName : tableNames) {
                builder.append(TEMPLATE.replace("#{sourceDatabase}",tableName.database).replace("#{sourceTable}",tableName.tableName).replace("#{sinkDatabase}", dorisSource.database)
                        .replace("#{tablePrefix}",tablePrefix)
                );
            }
            return builder.toString();
        }
    }

    public static String syncTables(MysqlSource mysqlSource,DorisSource dorisSource,RouteConfig routeConfig){
        return syncTables(mysqlSource,dorisSource,routeConfig,null);
    }
    public static String syncTables(MysqlSource mysqlSource,DorisSource dorisSource,RouteConfig routeConfig,PipeLineConfig pipeLineConfig){
        if (pipeLineConfig == null){
            pipeLineConfig = PipeLineConfig.builder().build();
        }
        Assert.notNull(mysqlSource);
        Assert.notNull(dorisSource);
        Assert.notNull(routeConfig);
        mysqlSource.initConfig();
        routeConfig.initConfig();
        pipeLineConfig.initConfig();
        pipeLineConfig.checkConfig();
        mysqlSource.checkConfig();
        dorisSource.checkConfig();
        routeConfig.checkConfig();
        return pipeLineConfig.getConfig() +
                mysqlSource.getConfig() +
                dorisSource.getConfig() +
                routeConfig.getConfig();
    }

    public static Set<TableName> getTableNamesFromStr(String str){
        String[] split = str.split("\n");
        Set<TableName> ret = new HashSet<>();
        for (String s : split) {
            if (StrUtil.isNotBlank(s)){
                s = s.replace(" ","").toLowerCase();
                String[] ss = s.split("\\.");
                ret.add(TableName.builder().database(ss[0]).tableName(ss[1]).build());
            }
        }
        return ret;
    }
    public static void main(String[] args) {
        String tables = "customer_user.tb_business_user_info\n" +
                "express.route_push_service\n";
        Set<TableName> tableNameSet = getTableNamesFromStr(tables);

        MysqlSource mysqlSource = MysqlSource.builder().host("10.1.0.24").port(3306).username("root").password("xxxxxxxxx").serverId("your MYSQL serverId").tableNames(new ArrayList<>(tableNameSet)).build();

        DorisSource dorisSource = DorisSource.builder().
                feNodeList(Lists.newArrayList(DorisFeNode.builder().host("10.1.0.27").port(8030).build(),
                        DorisFeNode.builder().host("10.1.0.50").port(8030).build(),
                        DorisFeNode.builder().host("10.1.0.244").port(8030).build()
                        ))
                .beNodeList(Lists.newArrayList(DorisBeNode.builder().host("10.1.0.27").port(8040).build(),
                        DorisBeNode.builder().host("10.1.0.50").port(8040).build(),
                        DorisBeNode.builder().host("10.1.0.244").port(8040).build())
                        )
                .username("root").password("xxxxxxxxxxx").database("test").build();
        RouteConfig routeConfig = RouteConfig.builder().mysqlSource(mysqlSource).dorisSource(dorisSource).tablePrefix("ods_").build();
        String config = syncTables(mysqlSource,dorisSource,routeConfig);
        System.out.println("要同步表为:");
        for (TableName tableName : mysqlSource.tableNames) {
            System.out.println(tableName.toString());
        }
        System.out.println("请提前自行到doris中建好目标数据库,如果没有会报错");
        System.out.println("同步的配置文件如下:(将配置内容保存为xxx.yaml文件到flink cdc提交任务)");
        System.out.println();
        System.out.println(config);
    }

    public static void main12(String[] args){
        // 要同步的表
        String tables = "customer_user.tb_business_user_info\n" +
                "express.route_push_service\n" +
                "funds.dunning_order_request_table\n" +
                "funds.payout_order_table\n" +
                "funds.pdd_settle_table\n" +
                "manage.coupon_order_detail\n" +
                "manage.fulfillment_info\n" +
                "manage.order_report_detail\n" +
                "manage.recycle_inspection_report\n" +
                "manage.recycle_store_order_review\n" +
                "market.device\n" +
                "market.device \n" +
                "market.order\n" +
                "market.order_and_device\n" +
                "order.order_finance_detail\n" +
                "order.order_info\n" +
                "order.order_status\n" +
                "order.order_store_daily\n" +
                "order.order_store_recycle_detail\n" +
                "order.order_trade_in_detail\n" +
                "storage.delivery_order\n" +
                "storage.device\n" +
                "storage.inspect_order\n" +
                "storage.receive_order_detail\n" +
                "storage.recycle_updoor_inspections \n" +
                "storage.send_goods_order_detail\n" +
                "storage.storage_order\n" +
                "storage.unpack\n" +
                "storage.warehouse\n" +
                "storage.work_order_result\n";
        // 同步成功的表
        String now = "ods_customer_user_tb_business_user_info\n" +
                "ods_express_route_push_service\n" +
                "ods_funds_dunning_order_request_table\n" +
                "ods_funds_payout_order_table\n" +
                "ods_manage_coupon_order_detail\n" +
                "ods_manage_fulfillment_info\n" +
                "ods_manage_order_report_detail\n" +
                "ods_manage_recycle_inspection_report\n" +
                "ods_manage_recycle_store_order_review\n" +
                "ods_market_device\n" +
                "ods_market_order\n" +
                "ods_market_order_and_device\n" +
                "ods_order_order_finance_detail\n" +
                "ods_order_order_info\n" +
                "ods_order_order_status\n" +
                "ods_order_order_store_daily\n" +
                "ods_order_order_store_recycle_detail\n" +
                "ods_order_order_trade_in_detail\n" +
                "ods_storage_delivery_order\n" +
                "ods_storage_device\n" +
                "ods_storage_inspect_order\n" +
                "ods_storage_receive_order_detail\n" +
                "ods_storage_recycle_updoor_inspections\n" +
                "ods_storage_send_goods_order_detail\n" +
                "ods_storage_storage_order\n" +
                "ods_storage_unpack\n" +
                "ods_storage_warehouse\n" +
                "ods_storage_work_order_result";
        checkLostTables(tables,now);
    }

    /**
     * 检查同步丢失的表,有时候会有报错没有提示是哪个表,这样对比看是是哪个失败了
     *
     * @param target
     * @param now
     */
    public static void checkLostTables(String target,String now){
        String[] targetSplit = target.split("\n");
        ArrayList<String> targetList = new ArrayList<>();
        for (int i = 0; i < targetSplit.length; i++) {
            targetList.add("ods_"+targetSplit[i].replace(".","_").replace(" ",""));
        }
        String[] nowSplit = now.split("\n");
        ArrayList<String> nowList = new ArrayList<>();
        for (String s : nowSplit) {
            nowList.add(s.replace(" ",""));
        }
        CollectionUtil.sortByPinyin(targetList);
        CollectionUtil.sortByPinyin(nowList);
        System.out.println("target len: "+targetList.size());
        System.out.println("now len: "+nowList.size());
        Collection<String> subtract = CollectionUtil.subtract(targetList, nowList);
        System.out.println(JSONObject.toJSONString(subtract));

        for (int i = 0; i < targetList.size(); i++) {
            try {
                System.out.print(targetList.get(i));
            }catch (Exception e){}
            System.out.print("  \t  ");
            try {
                System.out.print(nowList.get(i));
            }catch (Exception e){}
            System.out.println( );
        }
    }
}
举报

相关推荐

0 条评论