CSV->MySQL
创建表
建表语句
Navicat手动建立
MySQL<==>CSV
#sql语句的关键字大写 完整版的sql语句模式,不要用简化模式
import pymysql
import csv
import codecs
def get_conn():
conn = pymysql.connect(
host = 'localhost',
port = 3306,
user = 'root',
passwd = 'root',
db = 'test_csv',
charset = 'utf8' #数据库里是utf8,没有-
)
return conn
def insert(cur,sql,args):
cur.execute(sql,args)
def query_all(cur,sql,args):
cur.execute(sql,args)
return cur.fetchall()
def read_mysql_to_csv(filename):
with codecs.open(filename = filename, mode = 'w', encoding='utf-8') as f:
write = csv.writer(f,dialect = 'excel')
conn = get_conn()
cur = conn.cursor()
sql = 'select * from tb_csv'
results = query_all(cur = cur, sql = sql, args = None)
for rusult in results:
print(result)
write.writerow(result)
def read_csv_to_mysql(filename):
with codecs.open(filename = filename, mode = 'r', encoding = 'utf-8') as f: #和库的编码集保持一致
reader = csv.reader(f)
head = next(reader)
conn = get_conn()
cur = conn.cursor()
sql = 'insert into tb_sb values(%s,%s,%s)'
for item in reader:
if item[1] is None or item[1]=='':
continue
args = tuple(item)
print(args)
insert(cur,sql = sql, args = args)
conn.commit()
cur.close()
conn.close()
if __name__ == '__main__':
read_csv_to_mysql(r'***.myd')
read_mysql_to_csv(r'***.csv')
实战pymysql+pandas
import pymysql
#通过字典设置参数
config = dict(
host="localhost",
port=3306,
user="root",
passwd="liyunhao",
cursorclass = pymysql.cursors.DictCursor
)
#建立连接
conn = pymysql.connect(**config)
#自动提交确认
conn.autocommit(1)
#设置游标对象
cursor = conn.cursor()
import pandas as pd
df = pd.read_csv(r'***.csv',encoding='gbk',usecols=[0,3,4,5,11],parse_dates=['日期'])
print(df.head())
#根据pandas自动识别 设置表的类型
def make_table_sql(df):
columns = df.columns.tolist()
types = df.ftypes
make_table = []
for item in columns:
if 'int' in types[item]:
char = item + ' INT'
elif: 'float' in type[item]:
char = item + ' FLOAT'
elif: 'object' in type[item]:
char = item + ' VARCHAR(255)'
elif: 'datetime' in type[item]:
char = item + ' DATETIME'
make_table.append(char)
return ''.join(make_table)
#创建数据库、创建表、批量写入数据库
def csv2mysql(db_name,table_name,df):
#创建数据库
cursor.execute('CREATE TABLE IF NOT EXISTS {}'.format(db_name))
#连接数据库
conn.select_db(db_name)
#创建表
cursor.execute('CREATE TABLE {}({})'.format(table_name,make_table_sql(df)))
df['日期'] = df['日期'].astype('str')
values = df.values.tolist()
#批量插入数据
s = ','.join(['%s' for _ in range(len(df.columns))])
cursor.executemany('INSERT INTO {} VALUES ({})'.format(table_name,s),values)
cursor.close()
conn.close()
#运行测试
if __name__ == '__main__':
csv2mysql('stock','test1',df)
实战 sqlalchemy
import pandas from sqlalchemy as pd
import create_engine from datetime
import datetime from sqlalchemy.types
import NVARCHAR.Interger
#设置连接
engine = create_engine("mysql+pymysql//root:root@localhost:3306/stock?charset=utf8",max_overflows=6)
#建立连接
con = engine.connect()
#pandsa读取csv文件
df = df = pd.read_csv(r'***.csv',encoding='gbk',usecols=[0,3,4,5,11],parse_dates=['日期'])
#pandas与sql类型的转换
def map_types(df):
dtypedict = {}
#zip():将可迭代的对象作为参数,将对象中对应的元素打包成一个元组,然后返回由这个元组组成的列表
for i,j in zip(df.colnums,df.dtypes):
if 'object' in str(j):
dtypedict.update({i:NVARCHAR(length = 255)})
if 'float' in str(j):
dtypedict.update({i:Float(precision=2,asdecimal = True)})
if 'int' in str(j):
dtypedict.update({i:Integer()})
return dtypedict
#存入操作
dtypedict = map_types(df)
#if_exists
#值有三个
#append 如果存在 把数据加到表的后面
#fail:如果存在就不操作
#replace 如果存在就替换
df.to_sql(name='test2',con=con,if_exists='replace',index=False,dtype = dtypedict)#第三个参数是操作方式