Pandas I/O 体系概览
Pandas 提供了覆盖几乎所有主流数据格式的 I/O 接口,命名规范一致:pd.read_xxx() 用于读取,df.to_xxx() 用于写出。
| 格式 | 读取函数 | 写出方法 | 适用场景 |
|---|---|---|---|
| CSV | read_csv() | to_csv() | 通用文本格式,兼容性最好 |
| Excel | read_excel() | to_excel() | 与 Excel 文件交互 |
| JSON | read_json() | to_json() | API 响应,前后端数据交换 |
| Parquet | read_parquet() | to_parquet() | 大数据场景,列式存储 |
| SQL | read_sql() | to_sql() | 关系型数据库 |
| HDF5 | read_hdf() | to_hdf() | 科学计算,大型数值数据 |
| Feather | read_feather() | to_feather() | 快速 Arrow 格式交换 |
| Pickle | read_pickle() | to_pickle() | Python 内部序列化(不跨版本) |
read_csv:最常用的数据读取函数
pd.read_csv() 是 Pandas 中使用频率最高的函数,拥有超过 50 个参数。掌握其关键参数能让你处理 90% 的 CSV 场景。
基础用法与核心参数
import pandas as pd
# 最基础的用法
df = pd.read_csv('data.csv')
# 常用参数详解
df = pd.read_csv(
'data.csv',
# 分隔符相关
sep=',', # 分隔符,默认逗号;Tab 分隔用 '\t'
delimiter=None, # sep 的别名
# 标题和索引
header=0, # 列名所在行,默认第0行;header=None 表示无列名
names=['A','B','C'], # 手动指定列名
index_col=0, # 用第0列作为行索引;index_col='id' 按列名指定
# 数据类型
dtype={'age': 'Int64', 'name': 'string'}, # 指定列类型
dtype_backend='pyarrow', # 使用 PyArrow 后端(2.x 新参数)
# 缺失值处理
na_values=['NA', 'NULL', '-', ''], # 额外的缺失值标记
keep_default_na=True, # 是否保留默认缺失值标记
# 行选择
skiprows=2, # 跳过前2行(如标注说明行)
nrows=1000, # 只读取前1000行(调试时非常有用)
skipfooter=1, # 跳过最后1行
# 编码与格式
encoding='utf-8', # 文件编码;中文常见 'gbk' 或 'utf-8-sig'
thousands=',', # 千位分隔符('1,000' → 1000)
decimal='.', # 小数点符号
# 日期解析
parse_dates=['date_col'], # 自动解析指定列为日期类型
# 性能相关
low_memory=False, # 关闭低内存模式(类型推断更准确)
engine='c', # 解析引擎:'c'(快)或 'python'(功能多)
)
处理中文 CSV 的常见问题
# 问题1:编码问题(最常见)
# Windows Excel 保存的 CSV 通常是 GBK 编码
df = pd.read_csv('中文数据.csv', encoding='gbk')
# 或者带 BOM 的 UTF-8
df = pd.read_csv('中文数据.csv', encoding='utf-8-sig')
# 自动检测编码(使用 chardet 库)
import chardet
with open('中文数据.csv', 'rb') as f:
encoding = chardet.detect(f.read())['encoding']
df = pd.read_csv('中文数据.csv', encoding=encoding)
# 问题2:列名有多余空格
df.columns = df.columns.str.strip() # 去除列名首尾空格
类型推断陷阱
Pandas 的自动类型推断有时会出错:混合了数字和"N/A"字符串的列会被推断为 object 类型;含有前导零的编号(如 "001")会被推断为整数导致前导零丢失。养成习惯:重要列在读取时就用
dtype 参数明确指定类型,而不是读完再转换。
读取 Excel 文件
# 需要安装:pip install openpyxl
import pandas as pd
# 读取第一个工作表(默认)
df = pd.read_excel('report.xlsx')
# 指定工作表
df = pd.read_excel('report.xlsx', sheet_name='销售数据')
df = pd.read_excel('report.xlsx', sheet_name=1) # 按位置索引(0-based)
# 一次读取所有工作表,返回字典
all_sheets = pd.read_excel('report.xlsx', sheet_name=None)
for name, df_sheet in all_sheets.items():
print(f"{name}: {df_sheet.shape}")
# 读取时跳过标题行(如合并单元格标题)
df = pd.read_excel('report.xlsx', header=2, skiprows=[3])
# 写出到 Excel,保留格式
with pd.ExcelWriter('output.xlsx', engine='openpyxl') as writer:
df1.to_excel(writer, sheet_name='Sheet1', index=False)
df2.to_excel(writer, sheet_name='Sheet2', index=False)
读取 JSON 数据
JSON 是 API 响应的标准格式,但 JSON 的结构多样,Pandas 的 orient 参数对应不同的 JSON 结构:
import pandas as pd
import json
# orient='records'——最常见的 API 格式
# 对应:[{"name":"Alice","age":25}, {"name":"Bob","age":30}]
df = pd.read_json('data.json', orient='records')
# orient='columns'——列优先格式
# 对应:{"name":{"0":"Alice","1":"Bob"}, "age":{"0":25,"1":30}}
df = pd.read_json('data.json', orient='columns')
# 处理 API 返回的嵌套 JSON
import requests
response = requests.get('https://api.example.com/data')
data = response.json()
# 如果数据在 data['results'] 中
df = pd.DataFrame(data['results'])
# 处理嵌套结构——json_normalize
nested_data = [
{'name': 'Alice', 'address': {'city': '北京', 'zip': '100000'}},
{'name': 'Bob', 'address': {'city': '上海', 'zip': '200000'}},
]
df = pd.json_normalize(nested_data)
# 结果:columns = ['name', 'address.city', 'address.zip']
df = pd.json_normalize(nested_data, sep='_')
# 结果:columns = ['name', 'address_city', 'address_zip']
Parquet:数据分析的推荐格式
Parquet 是一种面向列的二进制文件格式,由 Apache 开发。对于数据分析场景,Parquet 在几乎所有方面都优于 CSV:
- 列式存储 只读取需要的列,无需扫描整个文件。1GB 的 CSV 读取 3 列可能只需要读取 100MB。
- 内置压缩 默认使用 Snappy 压缩,文件体积通常是 CSV 的 20%-30%。
- 保留类型 dtype 信息存储在文件元数据中,读取时无需类型推断,速度更快且更准确。
- 分区支持 可以按列值分区存储(如按日期分区),查询特定分区时只读取对应文件。
# 需要安装:pip install pyarrow(推荐)或 fastparquet
import pandas as pd
# 写出 Parquet
df.to_parquet('data.parquet', engine='pyarrow', compression='snappy')
# 读取 Parquet
df = pd.read_parquet('data.parquet')
# 只读取部分列(列式存储的优势)
df = pd.read_parquet('data.parquet', columns=['name', 'salary'])
# 读取目录中的所有分区文件
df = pd.read_parquet('data_partitioned/')
# 性能对比(100万行数据,典型结果)
# CSV 读取: ~3.2s,文件大小 ~85MB
# Parquet 读取:~0.4s,文件大小 ~12MB(含 Snappy 压缩)
建议
对于需要多次读取的数据集,将 CSV 转换为 Parquet 格式是性价比极高的优化。一行代码:
pd.read_csv('data.csv').to_parquet('data.parquet'),后续读取速度提升 5-10 倍,存储空间减少 70%-80%。
从 SQL 数据库读取数据
import pandas as pd
from sqlalchemy import create_engine
# 创建数据库连接(SQLAlchemy 引擎)
engine = create_engine('postgresql://user:password@localhost:5432/mydb')
# 执行 SQL 查询
df = pd.read_sql("SELECT * FROM sales WHERE date >= '2024-01-01'", engine)
# 直接读取整张表
df = pd.read_sql_table('customers', engine)
# 仅执行查询(read_sql_query)
query = """
SELECT dept, AVG(salary) as avg_salary, COUNT(*) as count
FROM employees
GROUP BY dept
ORDER BY avg_salary DESC
"""
df = pd.read_sql_query(query, engine)
# 写入数据库
df.to_sql('new_table', engine, if_exists='replace', index=False)
# if_exists 选项:'fail'(默认)、'replace'(重建表)、'append'(追加行)
大文件处理策略
当文件超过可用内存时,一次性读取会导致 OOM(内存不足)错误。Pandas 提供了分块读取(chunked reading)的机制。
方法一:chunksize 迭代处理
import pandas as pd
# chunksize 指定每次读取的行数
chunk_size = 100_000 # 每次读取10万行
results = []
for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
# 对每个 chunk 进行处理,只保留需要的结果
chunk_result = chunk[chunk['status'] == 'active']['revenue'].sum()
results.append(chunk_result)
total = sum(results)
print(f"活跃用户总收入:{total}")
# 合并 chunk 为完整 DataFrame(谨慎:只适合结果数据不大的情况)
dfs = []
for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
filtered = chunk[chunk['country'] == 'CN']
dfs.append(filtered)
df_cn = pd.concat(dfs, ignore_index=True)
方法二:只读取需要的列
# 大多数情况下不需要所有列
df = pd.read_csv(
'huge_file.csv',
usecols=['date', 'product_id', 'revenue'], # 只读取3列
dtype={'product_id': 'category', 'revenue': 'float32'}, # 优化类型
parse_dates=['date'],
)
方法三:使用 Polars 处理超大文件
# Polars 对大文件更友好,可以用 Polars 处理后转为 Pandas
import polars as pl
# Polars 的 lazy API——只在需要时才实际读取数据
df_polars = (
pl.scan_csv('huge_file.csv') # 懒加载,不立即读取
.filter(pl.col('status') == 'active')
.select(['date', 'revenue'])
.collect() # 执行查询
)
# 转为 Pandas(如果后续需要 Pandas API)
df = df_polars.to_pandas()
数据类型推断策略
自动类型推断虽然方便,但存在以下风险:速度慢(需要扫描全列)、推断错误(整数列含"NA"字符串会变成 object)。最佳实践是在首次探索数据后,为生产代码明确指定 dtype:
# 第一步:快速浏览数据,了解每列的内容
df_sample = pd.read_csv('data.csv', nrows=1000)
print(df_sample.dtypes)
print(df_sample.describe(include='all'))
# 第二步:确定合理的类型后,在正式读取时指定
SCHEMA = {
'user_id': 'int32', # 用 int32 代替 int64 节省空间
'product_id': 'category', # 重复值多的字符串列用 category
'price': 'float32', # 精度要求不高时用 float32
'status': 'category',
'description': 'string', # 唯一字符串用 string 而非 object
}
df = pd.read_csv('data.csv', dtype=SCHEMA, parse_dates=['created_at'])
print(df.memory_usage(deep=True).sum() / 1024**2, 'MB') # 查看内存占用
数据导出最佳实践
# CSV 导出
df.to_csv(
'output.csv',
index=False, # 通常不需要写出行索引
encoding='utf-8-sig', # 带 BOM 的 UTF-8,Excel 可直接打开
float_format='%.4f', # 浮点数格式
date_format='%Y-%m-%d', # 日期格式
)
# Parquet 导出(推荐)
df.to_parquet(
'output.parquet',
engine='pyarrow',
compression='zstd', # zstd 压缩比 snappy 更好,但略慢
index=False,
)
# 按列分区存储(大数据集)
df.to_parquet(
'output_partitioned/',
partition_cols=['year', 'month'], # 按年月分区
engine='pyarrow',
)
不要用 Pickle 跨版本共享数据
df.to_pickle() 生成的文件只能在相同或兼容版本的 Pandas/Python 中读取。用于团队协作或长期存储时,请使用 Parquet 或 CSV 等格式无关的格式。Pickle 只适合临时缓存当前 Python 会话的数据。
小结
数据导入是每个分析项目的起点,正确的设置能节省大量后续清洗工作:
- 读取时就用
dtype参数指定正确类型,避免依赖自动推断 - 用
nrows参数先快速浏览样本数据,再正式读取全量数据 - 长期存储和重复读取的数据,优先使用 Parquet 而非 CSV
- 超过内存限制的大文件,使用
chunksize分块处理或借助 Polars
下一章将进入数据分析的核心环节——数据清洗,处理现实数据中无处不在的缺失值、重复记录和格式问题。