Chapter 03

数据导入与导出

CSV、Excel、JSON、Parquet、SQL——掌握各种数据源的读写策略与大文件处理

Pandas I/O 体系概览

Pandas 提供了覆盖几乎所有主流数据格式的 I/O 接口,命名规范一致:pd.read_xxx() 用于读取,df.to_xxx() 用于写出。

格式读取函数写出方法适用场景
CSVread_csv()to_csv()通用文本格式,兼容性最好
Excelread_excel()to_excel()与 Excel 文件交互
JSONread_json()to_json()API 响应,前后端数据交换
Parquetread_parquet()to_parquet()大数据场景,列式存储
SQLread_sql()to_sql()关系型数据库
HDF5read_hdf()to_hdf()科学计算,大型数值数据
Featherread_feather()to_feather()快速 Arrow 格式交换
Pickleread_pickle()to_pickle()Python 内部序列化(不跨版本)

read_csv:最常用的数据读取函数

pd.read_csv() 是 Pandas 中使用频率最高的函数,拥有超过 50 个参数。掌握其关键参数能让你处理 90% 的 CSV 场景。

基础用法与核心参数

import pandas as pd

# 最基础的用法
df = pd.read_csv('data.csv')

# 常用参数详解
df = pd.read_csv(
    'data.csv',

    # 分隔符相关
    sep=',',          # 分隔符,默认逗号;Tab 分隔用 '\t'
    delimiter=None,   # sep 的别名

    # 标题和索引
    header=0,          # 列名所在行,默认第0行;header=None 表示无列名
    names=['A','B','C'],  # 手动指定列名
    index_col=0,       # 用第0列作为行索引;index_col='id' 按列名指定

    # 数据类型
    dtype={'age': 'Int64', 'name': 'string'},  # 指定列类型
    dtype_backend='pyarrow',  # 使用 PyArrow 后端(2.x 新参数)

    # 缺失值处理
    na_values=['NA', 'NULL', '-', ''],  # 额外的缺失值标记
    keep_default_na=True,              # 是否保留默认缺失值标记

    # 行选择
    skiprows=2,        # 跳过前2行(如标注说明行)
    nrows=1000,        # 只读取前1000行(调试时非常有用)
    skipfooter=1,      # 跳过最后1行

    # 编码与格式
    encoding='utf-8', # 文件编码;中文常见 'gbk' 或 'utf-8-sig'
    thousands=',',    # 千位分隔符('1,000' → 1000)
    decimal='.',      # 小数点符号

    # 日期解析
    parse_dates=['date_col'],  # 自动解析指定列为日期类型

    # 性能相关
    low_memory=False,  # 关闭低内存模式(类型推断更准确)
    engine='c',        # 解析引擎:'c'(快)或 'python'(功能多)
)

处理中文 CSV 的常见问题

# 问题1:编码问题(最常见)
# Windows Excel 保存的 CSV 通常是 GBK 编码
df = pd.read_csv('中文数据.csv', encoding='gbk')

# 或者带 BOM 的 UTF-8
df = pd.read_csv('中文数据.csv', encoding='utf-8-sig')

# 自动检测编码(使用 chardet 库)
import chardet
with open('中文数据.csv', 'rb') as f:
    encoding = chardet.detect(f.read())['encoding']
df = pd.read_csv('中文数据.csv', encoding=encoding)

# 问题2:列名有多余空格
df.columns = df.columns.str.strip()  # 去除列名首尾空格
💡
类型推断陷阱 Pandas 的自动类型推断有时会出错:混合了数字和"N/A"字符串的列会被推断为 object 类型;含有前导零的编号(如 "001")会被推断为整数导致前导零丢失。养成习惯:重要列在读取时就用 dtype 参数明确指定类型,而不是读完再转换。

读取 Excel 文件

# 需要安装:pip install openpyxl
import pandas as pd

# 读取第一个工作表(默认)
df = pd.read_excel('report.xlsx')

# 指定工作表
df = pd.read_excel('report.xlsx', sheet_name='销售数据')
df = pd.read_excel('report.xlsx', sheet_name=1)   # 按位置索引(0-based)

# 一次读取所有工作表,返回字典
all_sheets = pd.read_excel('report.xlsx', sheet_name=None)
for name, df_sheet in all_sheets.items():
    print(f"{name}: {df_sheet.shape}")

# 读取时跳过标题行(如合并单元格标题)
df = pd.read_excel('report.xlsx', header=2, skiprows=[3])

# 写出到 Excel,保留格式
with pd.ExcelWriter('output.xlsx', engine='openpyxl') as writer:
    df1.to_excel(writer, sheet_name='Sheet1', index=False)
    df2.to_excel(writer, sheet_name='Sheet2', index=False)

读取 JSON 数据

JSON 是 API 响应的标准格式,但 JSON 的结构多样,Pandas 的 orient 参数对应不同的 JSON 结构:

import pandas as pd
import json

# orient='records'——最常见的 API 格式
# 对应:[{"name":"Alice","age":25}, {"name":"Bob","age":30}]
df = pd.read_json('data.json', orient='records')

# orient='columns'——列优先格式
# 对应:{"name":{"0":"Alice","1":"Bob"}, "age":{"0":25,"1":30}}
df = pd.read_json('data.json', orient='columns')

# 处理 API 返回的嵌套 JSON
import requests
response = requests.get('https://api.example.com/data')
data = response.json()
# 如果数据在 data['results'] 中
df = pd.DataFrame(data['results'])

# 处理嵌套结构——json_normalize
nested_data = [
    {'name': 'Alice', 'address': {'city': '北京', 'zip': '100000'}},
    {'name': 'Bob',   'address': {'city': '上海', 'zip': '200000'}},
]
df = pd.json_normalize(nested_data)
# 结果:columns = ['name', 'address.city', 'address.zip']

df = pd.json_normalize(nested_data, sep='_')
# 结果:columns = ['name', 'address_city', 'address_zip']

Parquet:数据分析的推荐格式

Parquet 是一种面向列的二进制文件格式,由 Apache 开发。对于数据分析场景,Parquet 在几乎所有方面都优于 CSV:

# 需要安装:pip install pyarrow(推荐)或 fastparquet
import pandas as pd

# 写出 Parquet
df.to_parquet('data.parquet', engine='pyarrow', compression='snappy')

# 读取 Parquet
df = pd.read_parquet('data.parquet')

# 只读取部分列(列式存储的优势)
df = pd.read_parquet('data.parquet', columns=['name', 'salary'])

# 读取目录中的所有分区文件
df = pd.read_parquet('data_partitioned/')

# 性能对比(100万行数据,典型结果)
# CSV 读取:   ~3.2s,文件大小 ~85MB
# Parquet 读取:~0.4s,文件大小 ~12MB(含 Snappy 压缩)
ℹ️
建议 对于需要多次读取的数据集,将 CSV 转换为 Parquet 格式是性价比极高的优化。一行代码:pd.read_csv('data.csv').to_parquet('data.parquet'),后续读取速度提升 5-10 倍,存储空间减少 70%-80%。

从 SQL 数据库读取数据

import pandas as pd
from sqlalchemy import create_engine

# 创建数据库连接(SQLAlchemy 引擎)
engine = create_engine('postgresql://user:password@localhost:5432/mydb')

# 执行 SQL 查询
df = pd.read_sql("SELECT * FROM sales WHERE date >= '2024-01-01'", engine)

# 直接读取整张表
df = pd.read_sql_table('customers', engine)

# 仅执行查询(read_sql_query)
query = """
    SELECT dept, AVG(salary) as avg_salary, COUNT(*) as count
    FROM employees
    GROUP BY dept
    ORDER BY avg_salary DESC
"""
df = pd.read_sql_query(query, engine)

# 写入数据库
df.to_sql('new_table', engine, if_exists='replace', index=False)
# if_exists 选项:'fail'(默认)、'replace'(重建表)、'append'(追加行)

大文件处理策略

当文件超过可用内存时,一次性读取会导致 OOM(内存不足)错误。Pandas 提供了分块读取(chunked reading)的机制。

方法一:chunksize 迭代处理

import pandas as pd

# chunksize 指定每次读取的行数
chunk_size = 100_000  # 每次读取10万行
results = []

for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
    # 对每个 chunk 进行处理,只保留需要的结果
    chunk_result = chunk[chunk['status'] == 'active']['revenue'].sum()
    results.append(chunk_result)

total = sum(results)
print(f"活跃用户总收入:{total}")

# 合并 chunk 为完整 DataFrame(谨慎:只适合结果数据不大的情况)
dfs = []
for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
    filtered = chunk[chunk['country'] == 'CN']
    dfs.append(filtered)

df_cn = pd.concat(dfs, ignore_index=True)

方法二:只读取需要的列

# 大多数情况下不需要所有列
df = pd.read_csv(
    'huge_file.csv',
    usecols=['date', 'product_id', 'revenue'],  # 只读取3列
    dtype={'product_id': 'category', 'revenue': 'float32'},  # 优化类型
    parse_dates=['date'],
)

方法三:使用 Polars 处理超大文件

# Polars 对大文件更友好,可以用 Polars 处理后转为 Pandas
import polars as pl

# Polars 的 lazy API——只在需要时才实际读取数据
df_polars = (
    pl.scan_csv('huge_file.csv')  # 懒加载,不立即读取
    .filter(pl.col('status') == 'active')
    .select(['date', 'revenue'])
    .collect()  # 执行查询
)
# 转为 Pandas(如果后续需要 Pandas API)
df = df_polars.to_pandas()

数据类型推断策略

自动类型推断虽然方便,但存在以下风险:速度慢(需要扫描全列)、推断错误(整数列含"NA"字符串会变成 object)。最佳实践是在首次探索数据后,为生产代码明确指定 dtype:

# 第一步:快速浏览数据,了解每列的内容
df_sample = pd.read_csv('data.csv', nrows=1000)
print(df_sample.dtypes)
print(df_sample.describe(include='all'))

# 第二步:确定合理的类型后,在正式读取时指定
SCHEMA = {
    'user_id':    'int32',       # 用 int32 代替 int64 节省空间
    'product_id': 'category',   # 重复值多的字符串列用 category
    'price':      'float32',     # 精度要求不高时用 float32
    'status':     'category',
    'description': 'string',    # 唯一字符串用 string 而非 object
}

df = pd.read_csv('data.csv', dtype=SCHEMA, parse_dates=['created_at'])
print(df.memory_usage(deep=True).sum() / 1024**2, 'MB')  # 查看内存占用

数据导出最佳实践

# CSV 导出
df.to_csv(
    'output.csv',
    index=False,        # 通常不需要写出行索引
    encoding='utf-8-sig',  # 带 BOM 的 UTF-8,Excel 可直接打开
    float_format='%.4f',   # 浮点数格式
    date_format='%Y-%m-%d', # 日期格式
)

# Parquet 导出(推荐)
df.to_parquet(
    'output.parquet',
    engine='pyarrow',
    compression='zstd',   # zstd 压缩比 snappy 更好,但略慢
    index=False,
)

# 按列分区存储(大数据集)
df.to_parquet(
    'output_partitioned/',
    partition_cols=['year', 'month'],  # 按年月分区
    engine='pyarrow',
)
⚠️
不要用 Pickle 跨版本共享数据 df.to_pickle() 生成的文件只能在相同或兼容版本的 Pandas/Python 中读取。用于团队协作或长期存储时,请使用 Parquet 或 CSV 等格式无关的格式。Pickle 只适合临时缓存当前 Python 会话的数据。

小结

数据导入是每个分析项目的起点,正确的设置能节省大量后续清洗工作:

下一章将进入数据分析的核心环节——数据清洗,处理现实数据中无处不在的缺失值、重复记录和格式问题。