#!/usr/bin/env python
#coding=utf-8
import subprocess
import json
import os
import psycopg2
import re
import sys
import psycopg2.extras
def airtable(air_api_url,air_api_token):
dev_null = open(os.devnull, 'w')
list1 = []
list2 = []
air_api_url_base = air_api_url
list_all = []
while True:
response = subprocess.check_output('curl "%s" -H "Authorization: Bearer %s"' %(air_api_url,air_api_token), shell=True, stderr=dev_null).decode('utf-8').replace('\n', '')
res = json.loads(response)['records']
list_all.extend(res)
if 'offset' in json.loads(response).keys():
print('airtable访问的url:' + air_api_url_base + '?offset=' + json.loads(response)['offset'].replace('/','%2F'))
air_api_url = air_api_url_base + '?offset=' + json.loads(response)['offset'].replace('/', '%2F')
else:
break
print('airtable表中总计行数为:' + str(len(list_all)))
for i in list_all:
space = []
connect = []
slash = []
i['fields']['id'] = i['id']
for key in i['fields']:
if ' ' in key:
space.append(key)
elif '-' in key:
connect.append(key)
elif '/' in key:
slash.append(key)
else:
pass
for j in space:
i['fields'][j.replace(' ','_')] = i['fields'][j]
del i['fields'][j]
for g in connect:
i['fields'][g.replace('-','_')] = i['fields'][g]
del i['fields'][g]
for h in slash:
i['fields'][h.replace('/','_')] = i['fields'][h]
del i['fields'][h]
for key,value in i['fields'].items():
i['fields'][key]=str(value).replace('\'','\"')
list1.append(i['fields'])
#break
for i in list1:
list2 = list(set(list2).union(set(list(i.keys()))))
return list2,list1
def diff_list(list1,list2):
list1 = {item.lower() for item in list1}
list2 = {item.lower() for item in list2}
if set(list1).difference(set(list2)) == set():
return True
else:
return False
def sql(air_column_list,air_records_list,air_main_key):
list = []
list_key = []
date_re = re.compile('^2[0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9]$')
time_re = re.compile('^2[0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9]T')
for i in air_records_list:
for key in i:
if key not in list_key:
if type(i[key]) == type('a') and re.match(date_re,i[key]):
list.append(key + ' ' + 'date' + ' ' + 'NULL')
list_key.append(key)
elif type(i[key]) == type('a') and re.match(time_re,i[key]):
list.append(key + ' ' + 'timestamp' + ' ' + 'NULL')
list_key.append(key)
elif key == air_main_key:
list.append(key + ' ' + 'text' + ' ' + 'PRIMARY KEY' + ' ' + 'NOT NULL')
list_key.append(key)
else:
list.append(key + ' ' + 'text' + ' ' + 'NULL')
list_key.append(key)
else:
pass
if len(list) == len(air_column_list):
break
else:
pass
if len(list) == 1:
return '(' + list[0] + ')'
else:
return str(tuple(list)).replace("'","")
class Pq_opration():
def __init__(self,conn,cursor):
self.conn = conn
self.cursor = cursor
def select(self,select_sth,from_sth,where_sth=None):
self.cursor.execute("select %s from %s %s" %(select_sth,from_sth,where_sth))
rows = self.cursor.fetchall()
return rows
def create(self,db_table_total,db_sql):
self.cursor.execute("create table %s %s" %(db_table_total,db_sql))
self.conn.commit()
def insert(self,_dict,db_table_total):
if len(list(_dict.keys())) == 1:
self.cursor.execute('''insert into ''' + db_table_total + '(' +','.join(list(_dict.keys()))+''') values '''+ '(' + '\'' + list(_dict.values())[0] + '\'' + ')')
else:
self.cursor.execute('''insert into ''' + db_table_total + '(' +','.join(list(_dict.keys()))+''') values '''+ str(tuple(_dict.values())))
self.conn.commit()
def delete(self,main_key,value,db_table_total):
self.cursor.execute('''delete from ''' + db_table_total + ' where ' + main_key + ' = ' + '\'' + value + '\'')
self.conn.commit()
def merge(self,list1,list2,db_table_total,main_key):
count = 0
num = 0
for i in list1:
tag = 0
for j in list2:
if main_key in list(i.keys()) and i != {} and j != {} and i[main_key] == j[main_key.lower()]:
sql = self.dict_to_sql(i,main_key)
self.cursor.execute('''update ''' + db_table_total + ' set ' + sql + ' where ' + main_key + '=\'' + j[main_key.lower()] + '\'')
self.conn.commit()
num = num + 1
tag = tag + 1
print('数据库更新'+ str(num)+'条记录')
else:
pass
if tag == 0:
self.insert(i,db_table_total)
count = count + 1
print('数据库插入'+ str(count)+'条记录')
else:
pass
def overwrite(self,list1,list2,db_table_total,main_key):
count = 0
for i in list2:
tag = 0
for j in list1:
if main_key in list(i.keys()) and i != {} and j != {} and i[main_key] == j[main_key.lower()]:
tag = tag + 1
else:
pass
if tag == 0:
self.delete(main_key,i[main_key.lower()],db_table_total)
count = count + 1
print('数据库删除'+ str(count)+'条记录')
else:
pass
def append(self,list1,list2,db_table_total,main_key):
count = 0
for i in list1:
tag = 0
for j in list2:
if main_key in list(i.keys()) and i != {} and j != {} and i[main_key] == j[main_key.lower()]:
tag = tag + 1
else:
pass
if tag == 0:
self.insert(i,db_table_total)
count = count + 1
print('数据库插入'+ str(count)+'条记录')
else:
pass
def drop(self,db_table_total):
self.cursor.execute("drop table %s" %(db_table_total))
self.conn.commit()
def dict_to_sql(self,dict1,main_key):
dict1.pop(main_key)
a = list(dict1.keys())
b = list(dict1.values())
list1 = []
for i in range(len(a)):
if type(b[i]) == type(1) or type(b[i]) == type(1.1):
list1.append(a[i] + ' = ' + str(b[i]))
else:
list1.append(a[i] + ' = ' + '\'' + str(b[i]) + '\'')
return ','.join(list1)
if __name__ == "__main__":
input_var = sys.argv[1]
input_var_list = input_var.split(',')
for i in input_var_list:
exec(i)
air_main_key = 'id'
pq_column_list = []
pq_records_list = []
db_table_total = db_schema + '.' + db_table
#获取airtable表列名和record数据
air_column_list,air_records_list = airtable(air_api_url,air_api_token)
#通过列名构建新建数据库表的sql语句
db_sql = sql(air_column_list,air_records_list,air_main_key)
#print(air_records_list[0])
conn = psycopg2.connect(database="%s" %db_name, user="%s" %db_user, password="%s" %db_pass,
host="%s" %db_host, port="%s" %db_port)
cursor = conn.cursor()
pq_opration_ins = Pq_opration(conn,cursor)
pq_column_rows = pq_opration_ins.select('column_name','information_schema.columns',"where table_schema='%s' and table_name='%s'" %(db_schema,db_table))
if pq_column_rows == []:
count = 1
print('创建数据库表')
pq_opration_ins.create(db_table_total,db_sql)
for i in air_records_list:
if i != {}:
print('数据库写入' + str(count) + '条记录!')
pq_opration_ins.insert(i,db_table_total)
count = count + 1
else:
for i in pq_column_rows:
pq_column_list.append(i[0])
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
pq_opration_ins1 = Pq_opration(conn,cursor)
pq_records_rows = pq_opration_ins1.select('*',db_table_total)
for i in pq_records_rows:
pq_records_list.append(i)
result = diff_list(air_column_list,pq_column_list)
if result == True:
print('数据库更新记录')
if opt == 'merge':
pq_opration_ins.merge(air_records_list,pq_records_list,db_table_total,air_main_key)
elif opt == 'append':
pq_opration_ins.append(air_records_list,pq_records_list,db_table_total,air_main_key)
elif opt == 'overwrite':
pq_opration_ins.merge(air_records_list,pq_records_list,db_table_total,air_main_key)
air_column_list,air_records_list = airtable(air_api_url,air_api_token)
pq_opration_ins.overwrite(air_records_list,pq_records_list,db_table_total,air_main_key)
else:
print("请输入opt变量的值")
else:
print('删除旧数据表')
pq_opration_ins.drop(db_table_total)
print('创建新数据表')
pq_opration_ins.create(db_table_total,db_sql)
count = 1
for i in air_records_list:
if i != {}:
print('数据库写入' + str(count) + '条记录!')
pq_opration_ins.insert(i,db_table_total)
count = count + 1
conn.close()
#运行脚本的格式:
python3 airtable_export_to_pg.py 'air_api_url="XXX",air_api_token="XXX",db_name="XXX",db_host="XXX",db_port="XXX",db_schema="XXX",db_table="XXX",db_user="XXX",db_pass="XXX",opt="XXX"'
#说明:
air_api_url:airtable的base中table的api接口的url。
获取方式:访问https://airtable.com/api 网址,点击需要导出的base,打开后的页面能找到table的url。
举例:https://api.airtable.com/v0/appSxYPkicOH8rtqH/Table%201
https://api.airtable.com/v0/appxxxxxxxxxxxxxx/TableName
air_api_token:airtable用户的token
获取方式:访问https://airtable.com/网址,点击右上角用户,下拉菜单点击account,打开的页面能找到api key。
db_name:postgresql数据库名
db_host:postgresql数据库地址
db_port:postgresql数据库端口
db_schema:postgresql数据库的模式名,又叫schema名
db_table:postgresql数据表名,又叫table名
db_user:postgresql数据库登录的用户名
db_pass:postgresql数据库登录的密码
opt: merge/append/overwrite 三种模式任选一种
说明:merge是如果airtable有删除的行或列,它导到数据库的时候,不会删除之前已经导入进来的删除行列,但是overwrite就完全保持一致。append只会添加新增行列,不会改变进行了某些修改的记录。