Chapter 10

性能优化与大数据处理

Categorical 类型、内存压缩、chunk 策略、Polars 对比——让 Pandas 处理更大的数据

理解 Pandas 的性能瓶颈

Pandas 的性能在中小数据集(几百万行以内)上通常已经足够。当数据量增大或处理速度变慢时,问题通常来自以下几个方面:

Categorical 类型:重复值的终极节省

Categorical(分类类型)是 Pandas 内存优化中效果最显著的工具之一。它的原理是:对于取值有限的列(如性别、省份、产品类别),不重复存储每个字符串,而是维护一个"类别目录"(categories),每行只存储一个小整数(指向目录中的位置)。

import pandas as pd
import numpy as np

# 模拟100万行数据,有一个低基数的字符串列
n = 1_000_000
df = pd.DataFrame({
    'id':       range(n),
    'province': np.random.choice(['广东', '浙江', '江苏', '山东', '河南'], n),
    'gender':   np.random.choice(['male', 'female'], n),
    'product':  np.random.choice([f'SKU{i:04d}' for i in range(100)], n),
    'amount':   np.random.randint(10, 1000, n),
})

# 查看内存使用(object 类型)
print("原始内存:", df.memory_usage(deep=True).sum() / 1024**2, "MB")
# 约 140 MB

# 将低基数列转为 category
for col in ['province', 'gender', 'product']:
    df[col] = df[col].astype('category')

print("优化后内存:", df.memory_usage(deep=True).sum() / 1024**2, "MB")
# 约 12 MB(节省 90%+!)

# Categorical 的属性
print(df['province'].cat.categories)  # 所有唯一值
print(df['province'].cat.codes)       # 底层整数编码
print(df['province'].nunique())      # 5 个唯一值

# 添加新类别
df['province'] = df['province'].cat.add_categories(['北京', '上海'])

# 去除未使用的类别
df['province'] = df['province'].cat.remove_unused_categories()
💡
何时使用 Categorical 当列的唯一值数量(基数)远小于总行数时,Categorical 能显著节省内存:唯一值 / 总行数 < 50% 时通常值得转换。典型场景:省份(31个唯一值 vs 数百万行)、用户等级(3-5个值)、产品类别(几百个值 vs 数千万行)。

Categorical 与 groupby 的性能提升

import time

# groupby 性能对比(100万行)

# object 类型的 groupby
df_obj = df.copy()
df_obj['province'] = df_obj['province'].astype('object')
start = time.time()
_ = df_obj.groupby('province')['amount'].sum()
print(f"object groupby: {time.time()-start:.3f}s")   # 约 0.12s

# category 类型的 groupby
start = time.time()
_ = df.groupby('province', observed=True)['amount'].sum()
print(f"category groupby: {time.time()-start:.3f}s")  # 约 0.03s(4倍+提升)

数据类型优化:减少内存使用

import pandas as pd
import numpy as np

def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """自动压缩 DataFrame 的数据类型,减少内存占用"""
    df = df.copy()

    for col in df.select_dtypes(include=['int64']).columns:
        col_min = df[col].min()
        col_max = df[col].max()
        if col_min >= np.iinfo(np.int8).min and col_max <= np.iinfo(np.int8).max:
            df[col] = df[col].astype(np.int8)    # -128 ~ 127
        elif col_min >= np.iinfo(np.int16).min and col_max <= np.iinfo(np.int16).max:
            df[col] = df[col].astype(np.int16)   # -32768 ~ 32767
        elif col_min >= np.iinfo(np.int32).min and col_max <= np.iinfo(np.int32).max:
            df[col] = df[col].astype(np.int32)   # ~±21亿
        # else: 保持 int64

    for col in df.select_dtypes(include=['float64']).columns:
        # float32 精度约7位十进制数,float64约15位
        df[col] = df[col].astype(np.float32)

    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() / len(df) < 0.5:  # 唯一值比例 < 50%
            df[col] = df[col].astype('category')

    return df

# 使用示例
df_raw = pd.read_csv('large_data.csv')
before = df_raw.memory_usage(deep=True).sum() / 1024**2
df_opt = optimize_dtypes(df_raw)
after = df_opt.memory_usage(deep=True).sum() / 1024**2
print(f"内存:{before:.1f} MB → {after:.1f} MB(减少 {(1-after/before)*100:.0f}%)")

大文件的 chunk 处理策略

import pandas as pd
from pathlib import Path

# ── 策略1:filter + aggregate(最常用)──
# 场景:从 10GB 的日志文件中统计每小时的错误数
hourly_errors = {}

for chunk in pd.read_csv('big_log.csv', chunksize=500_000):
    # 只处理 ERROR 级别的日志
    errors = chunk[chunk['level'] == 'ERROR'].copy()
    errors['hour'] = pd.to_datetime(errors['timestamp']).dt.floor('h')
    # 分组计数
    chunk_counts = errors.groupby('hour').size()
    # 累积合并
    for hour, count in chunk_counts.items():
        hourly_errors[hour] = hourly_errors.get(hour, 0) + count

result = pd.Series(hourly_errors).sort_index()

# ── 策略2:CSV → Parquet 转换(一次转换,多次受益)──
def csv_to_parquet_chunks(csv_path, parquet_path, chunksize=500_000):
    """将大 CSV 文件分块转换为 Parquet"""
    for i, chunk in enumerate(pd.read_csv(csv_path, chunksize=chunksize)):
        chunk.to_parquet(
            f"{parquet_path}/part_{i:04d}.parquet",
            engine='pyarrow', index=False
        )
        print(f"已写入第 {i+1} 块")

# 之后读取时,直接 read_parquet 整个目录
# df = pd.read_parquet('output_parquet/')

PyArrow 后端的实际应用

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# 直接用 PyArrow 读取 Parquet(最快方案)
table = pq.read_table('data.parquet', columns=['date', 'amount'])
df = table.to_pandas()

# 用 PyArrow 后端读取 CSV(内存更小)
df_arrow = pd.read_csv('data.csv', dtype_backend='pyarrow')
print(df_arrow.dtypes)   # 所有类型都是 ArrowDtype
print(df_arrow.memory_usage(deep=True).sum())

# Pandas → PyArrow Table(零拷贝交换)
table = pa.Table.from_pandas(df_arrow)

# 写入高效的 Arrow IPC 文件
import pyarrow.ipc as ipc
with ipc.open_file('data.arrow', table.schema) as writer:
    writer.write(table)

Polars:下一代数据处理框架

Polars 是 2021 年以来快速崛起的 Python 数据处理库,用 Rust 编写,底层使用 Apache Arrow 内存格式。它的主要优势是:

Polars 的优势

  • 真正的多线程并行(Pandas 单线程)
  • Lazy 执行引擎(自动查询优化)
  • 内存效率极高(Arrow 列式存储)
  • 处理 10GB+ 数据不换行
  • API 设计更一致,无 index 系统

Pandas 的优势

  • 生态成熟,与 sklearn/matplotlib 无缝集成
  • 教程、文档、Stack Overflow 资源丰富
  • Index 系统方便时序数据操作
  • PyArrow 后端已大幅改善性能
  • Jupyter 显示效果更美观
# ── Polars 基础语法对比 ──
import polars as pl

# Pandas 写法
result_pd = (
    df_pandas
    .query('amount > 100')
    .groupby(['region', 'product'])
    .agg(total=('amount', 'sum'))
    .sort_values('total', ascending=False)
)

# Polars 等价写法(Lazy API)
result_pl = (
    pl.scan_csv('data.csv')      # 懒加载
    .filter(pl.col('amount') > 100)
    .group_by(['region', 'product'])
    .agg(pl.sum('amount').alias('total'))
    .sort('total', descending=True)
    .collect()           # 触发执行
)

# Polars → Pandas(如需在 Pandas 生态中继续使用)
df_pd = result_pl.to_pandas()

DuckDB:内存不够时的解决方案

DuckDB 是一个嵌入式 OLAP 数据库,与 Pandas 深度集成,可以直接对 DataFrame 执行 SQL 查询,并且能够处理超出内存的数据集(流式处理磁盘数据)。

# pip install duckdb
import duckdb
import pandas as pd

# DuckDB 可以直接查询 Pandas DataFrame!
df = pd.read_csv('sales.csv')

result = duckdb.sql("""
    SELECT region, product, SUM(amount) as total, COUNT(*) as orders
    FROM df
    WHERE amount > 100
    GROUP BY region, product
    ORDER BY total DESC
    LIMIT 20
""").df()  # .df() 将结果转为 Pandas DataFrame

# 直接查询 CSV/Parquet 文件(不需要先读入内存)
result = duckdb.sql("""
    SELECT *
    FROM read_csv_auto('huge_file.csv')
    WHERE amount > 10000
    LIMIT 1000
""").df()

# 查询整个 Parquet 目录(自动合并多文件)
result = duckdb.sql("""
    SELECT year, SUM(amount) as total
    FROM read_parquet('data/**/*.parquet')
    GROUP BY year
""").df()

性能分析与 profiling

import pandas as pd
import time
from memory_profiler import memory_usage  # pip install memory_profiler

# 时间测量
start = time.perf_counter()
# ... 你的代码 ...
elapsed = time.perf_counter() - start
print(f"耗时: {elapsed:.3f}s")

# Jupyter 中使用魔法命令
# %time df.groupby('dept')['salary'].mean()     # 单次运行
# %timeit df.groupby('dept')['salary'].mean()   # 多次运行取平均
# %memit df = pd.read_csv('large.csv')          # 内存使用

# 使用 pandas-profiling 自动分析(pip install ydata-profiling)
from ydata_profiling import ProfileReport
profile = ProfileReport(df, title="数据分析报告")
profile.to_file("report.html")  # 生成 HTML 报告,含每列的分布、缺失值统计等

Pandas 在现代数据栈中的定位

随着 Polars、DuckDB、Spark 等工具的发展,Pandas 在数据栈中的定位正在演变。一个务实的建议是:

数据规模推荐工具适用场景
< 100MBPandas(默认)日常分析、快速探索
100MB - 10GBPandas + PyArrow / Polars生产级数据管道、ETL
10GB - 1TBDuckDB / Polars本地大数据分析
> 1TBSpark / Databricks分布式处理
ℹ️
Pandas 的未来:Pandas 3.0 Pandas 3.0(预计 2025-2026 年发布)将强制启用 Copy-on-Write,彻底移除基于 NumPy 后端的传统 dtype(改为必须显式选择 NumPy 或 PyArrow 后端),并进一步与 Apache Arrow 生态深度融合。这将是一次重大的 API 清理,但也会带来显著的性能和一致性提升。现在学习 Pandas 2.x 是为 3.0 做好准备的最佳方式。

生产代码最佳实践总结

# ✅ DO:推荐做法

# 1. 读取时指定 dtype
df = pd.read_csv('data.csv', dtype={'id': 'int32', 'category': 'category'})

# 2. 优先使用 Parquet 格式
df.to_parquet('data.parquet', engine='pyarrow')

# 3. 链式操作,CoW 安全
result = (df.query('amount > 0').assign(tax=df['amount'] * 0.1))

# 4. 用 loc 赋值修改数据
df.loc[df['status'] == 'pending', 'priority'] = 'high'

# 5. groupby 用 observed=True(category 键)
df.groupby('category', observed=True)['value'].sum()

# 6. 字符串列用 string 而非 object
df['name'] = df['name'].astype('string')

# ❌ DON'T:避免的做法

# 1. 避免 iterrows(用向量化代替)
# for _, row in df.iterrows():  # 极慢!

# 2. 避免链式赋值
# df[condition]['col'] = val   # CoW 下无效

# 3. 避免循环中的 concat
# for item in items:           # 每次都创建新 DataFrame,O(n²)
#     df = pd.concat([df, pd.DataFrame([item])])
# 改用:dfs=[]; dfs.append(...); pd.concat(dfs)

# 4. 避免无谓的 apply
# df['double'] = df['value'].apply(lambda x: x * 2)  # 慢
# df['double'] = df['value'] * 2  # 向量化,快

结语:持续学习的路径

恭喜完成了本教程的全部 10 章内容!你已经掌握了 Pandas 2.x 的核心特性和最佳实践。接下来的学习路径建议: