"""
读取文件,导出mongo [带嵌套list] 的数据,展开.
问题描述:
形如这样的数据,展开:
old数据:
{"_id":123,"class":"good","queslist":[{"_id":123,"name":"yyl"},{"_id":124,"name":"eli"},{"_id":125,"name":"coin"}]}
new数据格式:
{"_id":123,"class":"good","queslist___id":123,"queslist_name":"yyl"}
{"_id":123,"class":"good","queslist___id":124,"queslist_name":"eli"}
{"_id":123,"class":"good","queslist___id":125,"queslist_name":"coin"}
"""
import pandas as pd
import datetime
import numpy as np
import ast
SEP_CHAR = '\001'
CHUNK_SIZE = 2
FILE_NAME = "jobs.csv"
COL = "_id,_class,queslist"
COLNUMS = COL.split(",")
COL_OF_LIST = COLNUMS[-1]
COLS_OF_LIST_DICT_str = "_id,name"
COLS_OF_LIST_DICT_OLD = COLS_OF_LIST_DICT_str.split(",")
print(COLNUMS)
print(COL_OF_LIST)
def de_Nested(df: pd.DataFrame):
"""展开嵌套的list"""
lens = [len(item) for item in df[COL_OF_LIST]]
print("lens:", lens)
respd = pd.DataFrame({})
for col in COLNUMS[:-1]:
respd[col] = np.repeat(df[col].values, lens)
respd[COL_OF_LIST] = np.concatenate(df[COL_OF_LIST].values)
for col in COLS_OF_LIST_DICT_OLD:
col_new = "{}_{}".format(COL_OF_LIST, col)
respd[col_new] = respd[COL_OF_LIST].map(lambda x: x.get(col)) # 选择json中的固定字段
return respd
def str_2_Array(x):
"""将dataframe 最后一列的列表str转换为np.array对象"""
return ast.literal_eval(x.replace('true', '"true"').replace('false', '"false"'))
def trans_file(filename=None):
"""使用pd批量转换csv文件"""
print('转换csv文件:_开始时间:{}'.format(str(datetime.datetime.now())))
df_chunk = pd.read_csv(filename, chunksize=CHUNK_SIZE, encoding="utf-8", iterator=True, quoting=0)
# quoting=0(QUOTE_MINIMAL)读取选择读取双引号,默认就是quoting=0
# 使用 chunksize 分块读取大型csv文件,这里每次读取 chunksize 为CHUNK_SIZE
# QUOTE_ALL = 1
# QUOTE_MINIMAL = 0
# QUOTE_NONE = 3
# QUOTE_NONNUMERIC = 2
res_file = filename.rsplit(".")[0] + ".txt"
Totle_lines = 0
for chunk in df_chunk:
Totle_lines += chunk.shape[0]
print("Totle_lines:",Totle_lines)
chunk[COL_OF_LIST] = chunk[COL_OF_LIST].apply(str_2_Array) # 将dataframe 最后一列的列表str转换为列表对象
df = de_Nested(chunk) # 展开嵌套的list
print(df.columns)
df.to_csv(res_file, header=False, index=False, na_rep='', sep=SEP_CHAR, quoting=3)
# quoting=3(QUOTE_NONE) 保存时去除双引号
break
print('转换csv文件:_结束时间:{}'.format(str(datetime.datetime.now())))
print('_完成!_处理记录数:{}\n数据文件保存至>{}'.format(str(Totle_lines), res_file))
trans_file(FILE_NAME)