0
点赞
收藏
分享

微信扫一扫

mongodb数据如何导入到clickhouse

一 背景说明

1 开发要求mongodb里的数据需要导入到clickhouse,方便他们分析,因此才有了如下的操作, 刚开始找了很多第三方的数据迁移软件,比如tapdata有这个功能,不过用过几次,经常报错,并且也是收费的。因此才决定自己写python脚本解决这个问题。

2 数据能否顺利导入,发现跟创建ck库里面表的字段类型有着密切的关系

二 在ck里需要提前创建好指定的表

下面是我在ck里面创建了一个core_customer_test 这个表,像create_time,update_time,应该都是Datetime的类型,我这里选择了string,不然我的导入脚本就会报错

CREATE TABLE mongodb.core_customer_test
(
    `create_time` Nullable(String),
    `dep_id`  Nullable(String),
    `is_delete` Nullable(String),
    `invitor_userid` Nullable(String),
    `name` Nullable(String),
    `qx_name` Nullable(String),
    `unionid` Nullable(String),
    `video_add_time` Nullable(String),
    `lunar_birthday` Nullable(String),
    `external_userid` Nullable(String),
    `follow_user` Nullable(String),
    `_id` Varchar,
    `birthday` Nullable(String),
    `update_time` Nullable(String),
    `sex` Nullable(String),
)
ENGINE = MergeTree

ORDER BY (_id);

三 数据同步脚本

3.1 全量同步脚本

import pymongo
import pymysql
import sys
from clickhouse_driver import Client
#import datetime
from datetime import datetime
from datetime import datetime, timedelta
#dd = collection.find().limit(2)
mydb = Client(
  host="clickhouse_ip.clickhouse.ads.aliyuncs.com",
  user="root_db",
  password="xxxxxx",
  database="mongodb"
)  #clickhouse
#mycursor = mydb.cursor()
client = pymongo.MongoClient('mongodb://root:xxxxxxx..@172.19.144.111:27018/')
db = client['sjzx']
collection = db['core_customer']
skip_num=0
count=collection.count_documents({})
print(count)
read_num=0
clickhouse_data = []

while True:
    clickhouse_data = []
    #print(skip_num)
    if read_num * 100 >= count:
       break
    dd = collection.find().limit(200).skip(skip_num)
    for row in dd:

        print(clickhouse_data)
        clickhouse_data = []
        converted_row = [
            str(row["_id"]),  # 转换为字符串类型
            str(row.get("birthday",'')),  # 转换为整数类型
            #row['create_time'].replace(microsecond=0),
            str(row.get("create_time",'')),
            str(row.get("dep_id",'')),  # 转换为浮点数类型
            #row.get['dep_id',''],
            str(row.get("is_delete",'')),  # 转换为浮点数类型
            str(row.get("name",'')),  # 转换为浮点数类型
            str(row.get("qx_name",'')),  # 转换为浮点数类型
            str(row.get("sex",'')),  # 转换为浮点数类型
            #row['update_time'].replace(microsecond=0),
            str(row.get("update_time",'')),  # 转换为浮点数类型
            str(row.get("video_add_time",'')),  # 转换为浮点数类型
            str(row.get("lunar_birthday",'')),  # 转换为浮点数类型
            str(row.get("invitor_userid",'')),  # 转换为浮点数类型
            str(row.get("external_userid",'')),  # 转换为浮点数类型
            str(row.get("follow_user",'')),  # 转换为浮点数类型
        ]

        clickhouse_data.append(converted_row)

        insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES')
        mydb.execute(insert_query, clickhouse_data,types_check=True)
    skip_num = skip_num + 200
    read_num = read_num + 1
mydb.disconnect()


3.2 增量同步脚本

增量同步是暂定第二天的1点钟,同步前一天的所有数据,在定时任务计划里固定时间运行 比如:2023-4-13 的1点钟,会同步 2023-4-12的一天的数据,以此类推。

import pymongo
import pymysql
import sys
from clickhouse_driver import Client
#import datetime
from datetime import datetime
from datetime import datetime, timedelta
#dd = collection.find().limit(2)
mydb = Client(
  host="clickhouse_ip.clickhouse.ads.aliyuncs.com",
  user="root_db",
  password="xxxxxxx",
  database="mongodb"
)  #clickhouse
#mycursor = mydb.cursor()
client = pymongo.MongoClient('mongodb://root:xxxxx..@172.19.144.111:27018/')
db = client['sjzx']
collection = db['core_customer']
skip_num=0
yes_date = datetime.now() - timedelta(days=1)

yes_date_start = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 00:00:00', '%Y-%m-%d %H:%M:%S')
yes_date_end = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
myquery = {'create_time': {'$gte': yes_date_start, '$lte': yes_date_end}}
count=collection.count_documents({})
print(count)
read_num=0
clickhouse_data = []

while True:
    clickhouse_data = []
    #print(skip_num)
    if read_num * 100 >= count:
       break
    dd = collection.find(myquery).limit(200).skip(skip_num)
    for row in dd:

        print(clickhouse_data)
        clickhouse_data = []
        converted_row = [
            str(row["_id"]),  # 转换为字符串类型
            str(row.get("birthday",'')),  # 转换为整数类型
            str(row.get("create_time",'')),  # 转换为整数类型
            str(row.get("dep_id",'')),  # 转换为浮点数类型
            str(row.get("is_delete",'')),  # 转换为浮点数类型
            str(row.get("name",'')),  # 转换为浮点数类型
            str(row.get("qx_name",'')),  # 转换为浮点数类型
            str(row.get("sex",'')),  # 转换为浮点数类型
            str(row.get("update_time",'')),  # 转换为浮点数类型
            str(row.get("video_add_time",'')),  # 转换为浮点数类型
            #row['video_add_time'].replace(microsecond=0),
            str(row.get("lunar_birthday",'')),  # 转换为浮点数类型
            str(row.get("invitor_userid",'')),  # 转换为浮点数类型
            str(row.get("external_userid",'')),  # 转换为浮点数类型
            str(row.get("follow_user",'')),  # 转换为浮点数类型
        ]

        clickhouse_data.append(converted_row)

        insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES')
        mydb.execute(insert_query, clickhouse_data,types_check=True)
    skip_num = skip_num + 200
    read_num = read_num + 1
mydb.disconnect()
#mydb.close()

四 到clickkhosue查看数据

image.png

最后,要感谢郑照辉的帮助。

举报

相关推荐

0 条评论