DataFrameReader / Writer API
Spark 通过统一的 DataFrameReader(spark.read)和 DataFrameWriter(df.write)API 接入各种存储系统。
# 读取:spark.read.format(…).option(…).load(path)
df = spark.read \
.format("parquet") \
.option("mergeSchema", "true") \
.load("s3://bucket/data/")
# 写入:df.write.format(…).mode(…).option(…).save(path)
df.write \
.format("parquet") \
.mode("overwrite") \ # overwrite/append/ignore/errorIfExists
.partitionBy("year", "month") \
.option("compression", "snappy") \
.save("s3://bucket/output/")
Parquet:大数据的首选格式
Apache Parquet 是面向列的二进制存储格式,是 Spark 的默认读写格式,也是现代数据湖的基石。
为什么选 Parquet
- 列式存储:只读取查询涉及的列,对 OLAP 分析场景可跳过 90%+ 的数据
- 高压缩率:同类型数据存储在一起,Snappy/ZSTD 压缩效果比行存高 3~10 倍
- 自带 Schema:文件本身包含数据类型信息,无需外部 DDL
- 谓词下推:文件级别和 Row Group 级别的统计信息(min/max)支持跳过不相关的数据块
- Schema 演化:支持新增列(向后兼容),旧文件中缺失的列读为 null
# 读取 Parquet:利用分区裁剪(Partition Pruning)
df = spark.read.parquet("s3://bucket/orders/year=2024/month=01/")
# 或:自动发现分区
df = spark.read.parquet("s3://bucket/orders/")
df.filter("year=2024 AND month=01") # Spark 自动转换为分区路径过滤
# 写入 Parquet:按日期分区
df.write \
.mode("append") \
.partitionBy("year", "month", "day") \
.option("compression", "zstd") \ # ZSTD 压缩率更高(推荐 Spark 3.x)
.parquet("s3://bucket/orders/")
# 合并 Schema(兼容不同版本的文件)
spark.read.option("mergeSchema", "true").parquet("s3://bucket/orders/")
表格式对比:Delta Lake / Iceberg / Hudi
普通 Parquet 文件不支持 ACID 事务,多个并发写入会损坏数据。现代表格式(Table Format)在 Parquet 之上添加了事务层、元数据管理和 Schema 演化能力。
| 特性 | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| ACID 事务 | 支持 | 支持 | 支持 |
| 时间旅行 | 支持(版本号/时间戳) | 支持(快照 ID) | 支持(时间戳) |
| Schema 演化 | 支持(增/删/改/重排列) | 支持(完整) | 支持(增列) |
| 行级更新/删除 | Merge Into | Merge Into | 内置 Upsert |
| 流批一体 | 支持 | 支持 | 支持(增量读) |
| 社区/生态 | Databricks 主导,极其活跃 | Netflix/Apple,厂商中立 | Uber 主导,流式场景强 |
| 推荐场景 | Databricks 用户首选 | 多引擎(Flink+Spark)场景 | 实时增量数据湖 |
# Delta Lake 使用示例
# pip install delta-spark
from delta import configure_spark_with_delta_pip
spark = configure_spark_with_delta_pip(
SparkSession.builder
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
).getOrCreate()
# 写入 Delta 表
df.write.format("delta").mode("overwrite").save("s3://bucket/delta/orders")
# 时间旅行:查看历史版本
spark.read.format("delta") \
.option("versionAsOf", "3") \
.load("s3://bucket/delta/orders")
# Merge Into(Upsert)
spark.sql("""
MERGE INTO delta.`s3://bucket/delta/orders` AS t
USING new_orders AS s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Kafka Source(流数据读取)
# 读取 Kafka 数据(批量模式)
df_kafka = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "earliest") \
.load()
# Kafka 消息的 value 是二进制,需要解析
df_parsed = df_kafka.select(
F.col("key").cast("string"),
F.from_json(
F.col("value").cast("string"),
schema # 事先定义好消息的 Schema
).alias("data"),
"topic", "partition", "offset", "timestamp"
).select("data.*") # 展开嵌套字段
云存储路径格式
| 平台 | 路径格式 | 说明 |
|---|---|---|
| AWS S3 | s3://bucket/path/ | 需配置 AWS 凭证 |
| AWS S3A | s3a://bucket/path/ | 推荐:更优的多线程上传 |
| HDFS | hdfs://namenode:8020/path/ | Hadoop 分布式文件系统 |
| GCS(GCP) | gs://bucket/path/ | Google Cloud Storage |
| ADLS(Azure) | abfss://container@account.dfs.core.windows.net/path/ | Azure Data Lake Storage Gen2 |
| 本地文件 | file:///absolute/path/ 或 相对路径 | 仅本地模式 |
格式选型建议:新项目优先使用 Delta Lake(如果在 Databricks 或 AWS 上)或 Apache Iceberg(多引擎、厂商中立场景)。纯批处理、无更新需求的场景使用 Parquet 即可。避免在生产中使用 CSV/JSON 作为中间存储格式。