Chapter 05

读写数据源

Parquet、Delta Lake、Iceberg、Hudi——选对存储格式是大数据性能的基础

DataFrameReader / Writer API

Spark 通过统一的 DataFrameReaderspark.read)和 DataFrameWriterdf.write)API 接入各种存储系统。

# 读取:spark.read.format(…).option(…).load(path)
df = spark.read \
    .format("parquet") \
    .option("mergeSchema", "true") \
    .load("s3://bucket/data/")

# 写入:df.write.format(…).mode(…).option(…).save(path)
df.write \
    .format("parquet") \
    .mode("overwrite") \    # overwrite/append/ignore/errorIfExists
    .partitionBy("year", "month") \
    .option("compression", "snappy") \
    .save("s3://bucket/output/")

Parquet:大数据的首选格式

Apache Parquet 是面向列的二进制存储格式,是 Spark 的默认读写格式,也是现代数据湖的基石。

为什么选 Parquet

# 读取 Parquet:利用分区裁剪(Partition Pruning)
df = spark.read.parquet("s3://bucket/orders/year=2024/month=01/")
# 或:自动发现分区
df = spark.read.parquet("s3://bucket/orders/")
df.filter("year=2024 AND month=01")  # Spark 自动转换为分区路径过滤

# 写入 Parquet:按日期分区
df.write \
    .mode("append") \
    .partitionBy("year", "month", "day") \
    .option("compression", "zstd") \    # ZSTD 压缩率更高(推荐 Spark 3.x)
    .parquet("s3://bucket/orders/")

# 合并 Schema(兼容不同版本的文件)
spark.read.option("mergeSchema", "true").parquet("s3://bucket/orders/")

表格式对比:Delta Lake / Iceberg / Hudi

普通 Parquet 文件不支持 ACID 事务,多个并发写入会损坏数据。现代表格式(Table Format)在 Parquet 之上添加了事务层、元数据管理和 Schema 演化能力。

特性Delta LakeApache IcebergApache Hudi
ACID 事务支持支持支持
时间旅行支持(版本号/时间戳)支持(快照 ID)支持(时间戳)
Schema 演化支持(增/删/改/重排列)支持(完整)支持(增列)
行级更新/删除Merge IntoMerge Into内置 Upsert
流批一体支持支持支持(增量读)
社区/生态Databricks 主导,极其活跃Netflix/Apple,厂商中立Uber 主导,流式场景强
推荐场景Databricks 用户首选多引擎(Flink+Spark)场景实时增量数据湖
# Delta Lake 使用示例
# pip install delta-spark
from delta import configure_spark_with_delta_pip

spark = configure_spark_with_delta_pip(
    SparkSession.builder
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
).getOrCreate()

# 写入 Delta 表
df.write.format("delta").mode("overwrite").save("s3://bucket/delta/orders")

# 时间旅行:查看历史版本
spark.read.format("delta") \
    .option("versionAsOf", "3") \
    .load("s3://bucket/delta/orders")

# Merge Into(Upsert)
spark.sql("""
    MERGE INTO delta.`s3://bucket/delta/orders` AS t
    USING new_orders AS s
    ON t.order_id = s.order_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

Kafka Source(流数据读取)

# 读取 Kafka 数据(批量模式)
df_kafka = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "earliest") \
    .load()

# Kafka 消息的 value 是二进制,需要解析
df_parsed = df_kafka.select(
    F.col("key").cast("string"),
    F.from_json(
        F.col("value").cast("string"),
        schema  # 事先定义好消息的 Schema
    ).alias("data"),
    "topic", "partition", "offset", "timestamp"
).select("data.*")  # 展开嵌套字段

云存储路径格式

平台路径格式说明
AWS S3s3://bucket/path/需配置 AWS 凭证
AWS S3As3a://bucket/path/推荐:更优的多线程上传
HDFShdfs://namenode:8020/path/Hadoop 分布式文件系统
GCS(GCP)gs://bucket/path/Google Cloud Storage
ADLS(Azure)abfss://container@account.dfs.core.windows.net/path/Azure Data Lake Storage Gen2
本地文件file:///absolute/path/ 或 相对路径仅本地模式

格式选型建议:新项目优先使用 Delta Lake(如果在 Databricks 或 AWS 上)或 Apache Iceberg(多引擎、厂商中立场景)。纯批处理、无更新需求的场景使用 Parquet 即可。避免在生产中使用 CSV/JSON 作为中间存储格式。