0
点赞
收藏
分享

微信扫一扫

python实现airtable导出数据到数据库中

愚鱼看书说故事 2022-03-11 阅读 72
python
#!/usr/bin/env python
#coding=utf-8
import subprocess
import json
import os
import psycopg2
import re
import sys
import psycopg2.extras

def airtable(air_api_url,air_api_token):
    dev_null = open(os.devnull, 'w')
    list1 = []
    list2 = []
    air_api_url_base = air_api_url
    list_all = []
    while True:
        response = subprocess.check_output('curl "%s" -H "Authorization: Bearer %s"' %(air_api_url,air_api_token), shell=True, stderr=dev_null).decode('utf-8').replace('\n', '')
        res = json.loads(response)['records']
        list_all.extend(res)
        if 'offset' in json.loads(response).keys():
            print('airtable访问的url:' + air_api_url_base + '?offset=' + json.loads(response)['offset'].replace('/','%2F'))
            air_api_url = air_api_url_base + '?offset=' + json.loads(response)['offset'].replace('/', '%2F')
        else:
            break
    print('airtable表中总计行数为:' + str(len(list_all)))
    for i in list_all:
        space = []
        connect = []
        slash = []
        i['fields']['id'] = i['id']
        for key in i['fields']:
            if ' ' in key:
                space.append(key)
            elif '-' in key:
                connect.append(key)
            elif '/' in key:
                slash.append(key)
            else:
                pass
        for j in space:
            i['fields'][j.replace(' ','_')] = i['fields'][j]
            del i['fields'][j]
        for g in connect:
            i['fields'][g.replace('-','_')] = i['fields'][g]
            del i['fields'][g]
        for h in slash:
            i['fields'][h.replace('/','_')] = i['fields'][h]
            del i['fields'][h]
        for key,value in i['fields'].items():
            i['fields'][key]=str(value).replace('\'','\"')
        list1.append(i['fields'])
        #break
    for i in list1:
        list2 = list(set(list2).union(set(list(i.keys()))))
    return list2,list1

def diff_list(list1,list2):
    list1 = {item.lower() for item in list1}
    list2 = {item.lower() for item in list2}
    
    if set(list1).difference(set(list2)) == set():
        return True
    else:
        return False

def sql(air_column_list,air_records_list,air_main_key):
    list = []
    list_key = []
    date_re = re.compile('^2[0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9]$')
    time_re = re.compile('^2[0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9]T')
    for i in air_records_list:
        for key in i:
            if key not in list_key:
                if type(i[key]) == type('a') and re.match(date_re,i[key]):
                    list.append(key + ' ' + 'date' + ' ' + 'NULL')
                    list_key.append(key)
                elif type(i[key]) == type('a') and re.match(time_re,i[key]):
                    list.append(key + ' ' + 'timestamp' + ' ' + 'NULL')
                    list_key.append(key)
                elif key == air_main_key:
                    list.append(key + ' ' + 'text' + ' ' + 'PRIMARY KEY' + ' ' + 'NOT NULL')
                    list_key.append(key)
                else:
                    list.append(key + ' ' + 'text' + ' ' + 'NULL')
                    list_key.append(key)
            else:
                pass
        if len(list) == len(air_column_list):
            break
        else:
            pass
    if len(list) == 1:
        return '(' + list[0] + ')'
    else:
        return str(tuple(list)).replace("'","")

class Pq_opration():
    def __init__(self,conn,cursor):
        self.conn = conn
        self.cursor = cursor

    def select(self,select_sth,from_sth,where_sth=None):
        self.cursor.execute("select %s from %s %s" %(select_sth,from_sth,where_sth))
        rows = self.cursor.fetchall()
        return rows

    def create(self,db_table_total,db_sql):
        self.cursor.execute("create table %s %s" %(db_table_total,db_sql))
        self.conn.commit()

    def insert(self,_dict,db_table_total):
        if len(list(_dict.keys())) == 1:
            self.cursor.execute('''insert into ''' + db_table_total + '(' +','.join(list(_dict.keys()))+''') values '''+ '(' + '\'' + list(_dict.values())[0] + '\'' + ')')
        else:
            self.cursor.execute('''insert into ''' + db_table_total + '(' +','.join(list(_dict.keys()))+''') values '''+ str(tuple(_dict.values()))) 
        self.conn.commit()
    def delete(self,main_key,value,db_table_total):
        self.cursor.execute('''delete from ''' + db_table_total + ' where ' + main_key + ' = ' + '\'' + value + '\'')
        self.conn.commit()

    def merge(self,list1,list2,db_table_total,main_key):
        count = 0
        num = 0
        for i in list1:
            tag = 0
            for j in list2:
                if main_key in list(i.keys()) and i != {} and j != {} and i[main_key] == j[main_key.lower()]:
                    sql = self.dict_to_sql(i,main_key)
                    self.cursor.execute('''update ''' + db_table_total + ' set ' + sql + ' where ' + main_key + '=\'' + j[main_key.lower()] + '\'')
                    self.conn.commit()
                    num = num + 1
                    tag = tag + 1
                    print('数据库更新'+ str(num)+'条记录')
                else:
                    pass
            if tag == 0:
                self.insert(i,db_table_total)
                count = count + 1
                print('数据库插入'+ str(count)+'条记录')
            else:
                pass

    def overwrite(self,list1,list2,db_table_total,main_key):
        count = 0
        for i in list2:
            tag = 0
            for j in list1:
                if main_key in list(i.keys()) and i != {} and j != {} and i[main_key] == j[main_key.lower()]:
                    tag = tag + 1
                else:
                    pass
            if tag == 0:
                self.delete(main_key,i[main_key.lower()],db_table_total)
                
                count = count + 1
                print('数据库删除'+ str(count)+'条记录')
            else:
                pass

    def append(self,list1,list2,db_table_total,main_key):
        count = 0
        for i in list1:
            tag = 0
            for j in list2:
                if main_key in list(i.keys()) and i != {} and j != {} and i[main_key] == j[main_key.lower()]:
                    tag = tag + 1
                else:
                    pass
            if tag == 0:
                self.insert(i,db_table_total)
                count = count + 1
                print('数据库插入'+ str(count)+'条记录')
            else:
                pass

    def drop(self,db_table_total):
        self.cursor.execute("drop table %s" %(db_table_total))
        self.conn.commit()
    
    def dict_to_sql(self,dict1,main_key):
        dict1.pop(main_key)
        a = list(dict1.keys())
        b = list(dict1.values())
        list1 = []
        for i in range(len(a)):
            if type(b[i]) == type(1) or type(b[i]) == type(1.1):
                list1.append(a[i] + ' = ' + str(b[i]))
            else:
                list1.append(a[i] + ' = ' + '\'' + str(b[i]) + '\'')
        return ','.join(list1)
        

if __name__ == "__main__":
    input_var = sys.argv[1]
    input_var_list = input_var.split(',')
    for i in input_var_list:
        exec(i)
    air_main_key = 'id'
    pq_column_list = []
    pq_records_list = []
    db_table_total = db_schema + '.' + db_table
    #获取airtable表列名和record数据
    air_column_list,air_records_list = airtable(air_api_url,air_api_token)
    #通过列名构建新建数据库表的sql语句
    db_sql = sql(air_column_list,air_records_list,air_main_key)
    #print(air_records_list[0])
    conn = psycopg2.connect(database="%s" %db_name, user="%s" %db_user, password="%s" %db_pass,
                            host="%s" %db_host, port="%s" %db_port)
    cursor = conn.cursor()
    pq_opration_ins = Pq_opration(conn,cursor)
    pq_column_rows = pq_opration_ins.select('column_name','information_schema.columns',"where table_schema='%s' and table_name='%s'" %(db_schema,db_table))
    if pq_column_rows == []:
        count = 1
        print('创建数据库表')
        pq_opration_ins.create(db_table_total,db_sql)
        for i in air_records_list:
            if i != {}:
                print('数据库写入' + str(count) + '条记录!')
                pq_opration_ins.insert(i,db_table_total)
                count = count + 1 

    else:
        for i in pq_column_rows:
            pq_column_list.append(i[0])

        cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        pq_opration_ins1 = Pq_opration(conn,cursor)
        pq_records_rows = pq_opration_ins1.select('*',db_table_total)

        for i in pq_records_rows:
            pq_records_list.append(i)

        result = diff_list(air_column_list,pq_column_list)

        if result == True:
            print('数据库更新记录')
            if opt == 'merge':
                pq_opration_ins.merge(air_records_list,pq_records_list,db_table_total,air_main_key)
            elif opt == 'append':
                pq_opration_ins.append(air_records_list,pq_records_list,db_table_total,air_main_key)
            elif opt == 'overwrite':
                pq_opration_ins.merge(air_records_list,pq_records_list,db_table_total,air_main_key)
                air_column_list,air_records_list = airtable(air_api_url,air_api_token)
                pq_opration_ins.overwrite(air_records_list,pq_records_list,db_table_total,air_main_key)
            else:
                print("请输入opt变量的值")
        else:
            print('删除旧数据表')
            pq_opration_ins.drop(db_table_total)
            print('创建新数据表')
            pq_opration_ins.create(db_table_total,db_sql)
            count = 1
            for i in air_records_list:
                if i != {}:
                    print('数据库写入' + str(count) + '条记录!')
                    pq_opration_ins.insert(i,db_table_total)
                    count = count + 1
    conn.close()

#运行脚本的格式:
python3  airtable_export_to_pg.py 'air_api_url="XXX",air_api_token="XXX",db_name="XXX",db_host="XXX",db_port="XXX",db_schema="XXX",db_table="XXX",db_user="XXX",db_pass="XXX",opt="XXX"'

#说明:
air_api_url:airtable的base中table的api接口的url。
            获取方式:访问https://airtable.com/api 网址,点击需要导出的base,打开后的页面能找到table的url。
            举例:https://api.airtable.com/v0/appSxYPkicOH8rtqH/Table%201
                  https://api.airtable.com/v0/appxxxxxxxxxxxxxx/TableName

air_api_token:airtable用户的token
            获取方式:访问https://airtable.com/网址,点击右上角用户,下拉菜单点击account,打开的页面能找到api key。

db_name:postgresql数据库名

db_host:postgresql数据库地址

db_port:postgresql数据库端口

db_schema:postgresql数据库的模式名,又叫schema名

db_table:postgresql数据表名,又叫table名

db_user:postgresql数据库登录的用户名

db_pass:postgresql数据库登录的密码

opt: merge/append/overwrite  三种模式任选一种

说明:merge是如果airtable有删除的行或列,它导到数据库的时候,不会删除之前已经导入进来的删除行列,但是overwrite就完全保持一致。append只会添加新增行列,不会改变进行了某些修改的记录。

举报

相关推荐

0 条评论