用Pandas处理大规模数据:分块读取与内存优化技巧

霸姨

关注

阅读 8

10-29 09:00

在数据科学项目中,我们经常需要处理远超内存容量的大型数据集(如数GB甚至TB级别的CSV文件)。直接使用pd.read_csv()加载整个文件会导致内存溢出(MemoryError),程序崩溃。幸运的是,Pandas提供了强大的工具来应对这一挑战。本文将介绍如何通过分块读取(Chunking)内存优化技巧,高效处理大规模数据。

1. 分块读取:chunksize参数

Pandas的read_csv()函数支持chunksize参数,允许你将大文件分割成小块(chunks)逐块读取。每一块都是一个独立的DataFrame,你可以对每块进行处理,然后丢弃以释放内存。

import pandas as pd

# 指定每次读取10,000行
chunk_size = 10000
file_path = 'large_dataset.csv'

# 创建一个TextFileReader对象
chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)

# 遍历每个数据块
for i, chunk in enumerate(chunk_iter):
    print(f"正在处理第 {i+1} 块,形状: {chunk.shape}")
    
    # 在这里对chunk进行处理
    # 例如:数据清洗、过滤、聚合等
    processed_chunk = chunk.dropna()  # 示例:删除缺失值
    
    # 可以将处理后的块保存或累加
    # ...

这种方式将内存占用从“整个文件”降低到“单个块”的大小,使处理超大文件成为可能。

2. 实际应用:增量聚合

分块读取常用于计算全局统计量,如总和、平均值、最大值等。你可以在每块上计算部分结果,然后合并。

# 计算整个文件中某一列的总和
total_sum = 0
column_name = 'sales'

for chunk in pd.read_csv(file_path, chunksize=chunk_size):
    total_sum += chunk[column_name].sum()

print(f"{column_name} 列的总和: {total_sum}")

# 计算平均值(需要同时跟踪总和和计数)
total_sum = 0
total_count = 0

for chunk in pd.read_csv(file_path, chunksize=chunk_size):
    total_sum += chunk[column_name].sum()
    total_count += len(chunk)

overall_mean = total_sum / total_count if total_count > 0 else 0
print(f"{column_name} 列的平均值: {overall_mean}")

3. 内存优化:数据类型调整

即使不涉及分块,优化数据类型也能显著减少内存占用。Pandas默认使用较宽泛的数据类型(如int64, float64),但很多时候更小的类型就足够了。

# 读取一小部分数据,分析其数据类型
sample_df = pd.read_csv(file_path, nrows=1000)

# 显示原始内存使用情况
print("原始内存使用:")
print(sample_df.memory_usage(deep=True).sum() / 1024**2, "MB")

# 优化数据类型
optimized_dtypes = {}
for col in sample_df.columns:
    col_type = sample_df[col].dtype
    if col_type != 'object':
        c_min = sample_df[col].min()
        c_max = sample_df[col].max()
        # 对于整数列,选择最小的合适类型
        if str(col_type)[:3] == 'int':
            if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                optimized_dtypes[col] = 'int8'
            elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                optimized_dtypes[col] = 'int16'
            elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                optimized_dtypes[col] = 'int32'
        # 对于浮点数列
        else:
            if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                optimized_dtypes[col] = 'float32'

print("优化后的数据类型:", optimized_dtypes)

在正式读取时应用这些类型:

# 使用优化后的数据类型读取整个文件(或分块读取)
df = pd.read_csv(file_path, dtype=optimized_dtypes)
# 或者结合分块
for chunk in pd.read_csv(file_path, chunksize=chunk_size, dtype=optimized_dtypes):
    # 处理chunk...
    pass

4. 其他内存节省技巧

  • 只读取需要的列:使用usecols参数。

df = pd.read_csv(file_path, usecols=['col1', 'col2', 'col3'])

  • 处理日期列:如果日期列是字符串,可以指定parse_dates让Pandas直接解析为datetime类型,避免后续转换开销。

df = pd.read_csv(file_path, parse_dates=['date_column'])

  • 使用category类型:对于重复度高的文本列(如状态、类别),转换为category能大幅节省内存。

df['category_col'] = df['category_col'].astype('category')

5. 结合使用:分块 + 类型优化

最佳实践是将分块读取与内存优化结合:

# 假设已确定优化的dtypes
final_dtypes = {
    'id': 'int32',
    'value': 'float32',
    'category': 'category'
}

results = []

for chunk in pd.read_csv(file_path, chunksize=10000, dtype=final_dtypes, usecols=final_dtypes.keys()):
    # 在每块上进行业务逻辑处理
    filtered_chunk = chunk[chunk['value'] > 100]
    results.append(filtered_chunk)

# 最后合并所有结果(如果内存允许)
final_result = pd.concat(results, ignore_index=True)

处理大规模数据是数据工程师的必备技能。通过chunksize进行分块读取,配合数据类型优化、列筛选等技巧,你可以轻松应对超出内存限制的数据集。关键在于避免一次性加载全部数据,采用流式处理的思想。掌握这些方法,让你的Pandas代码不仅功能强大,而且高效稳定。

精彩评论(0)

0 0 举报