Chapter 10

与现代数据栈集成

Delta Lake + dbt + Kafka + Iceberg——构建可靠、可扩展的现代数据平台架构

Delta Lake:ACID 事务与时间旅行

Delta Lake 是建立在 Parquet 文件之上的开源存储层,通过 事务日志(_delta_log/)为数据湖带来数据库级别的可靠性保证。

核心特性

from delta.tables import DeltaTable

# 写入 Delta 表
df.write.format("delta").mode("overwrite").save("/data/orders")

# 时间旅行:按版本查询
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/data/orders")

# 时间旅行:按时间戳查询
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15 00:00:00") \
    .load("/data/orders")

# 查看历史
delta_table = DeltaTable.forPath(spark, "/data/orders")
delta_table.history(10).show()

# 合并优化(压缩小文件并按 user_id 排序)
spark.sql("OPTIMIZE delta.`/data/orders` ZORDER BY (user_id, order_date)")

# 清理旧版本(释放存储空间)
delta_table.vacuum(retentionHours=168)  # 保留最近 7 天的历史

Lakehouse 架构

Lakehouse 是结合了数据仓库(可靠性、ACID、Schema)和数据湖(低成本、灵活、多格式)优点的新型架构范式,由 Databricks 于 2020 年提出。

Lakehouse 分层架构 ┌───────────────────────────────────────────────────────┐ │ 消费层 │ │ BI 工具 │ ML 模型 │ 实时 API │ Ad-hoc SQL │ └──────────────────────┬────────────────────────────────┘ ┌───────────────────────────────────────────────────────┐ │ Gold 层(业务聚合表) │ │ fact_orders │ dim_users │ mart_daily_revenue │ ├───────────────────────────────────────────────────────┤ │ Silver 层(清洗/标准化) │ │ 去重、类型转换、异常过滤、关联维表 │ ├───────────────────────────────────────────────────────┤ │ Bronze 层(原始数据落地) │ │ Kafka → 原始 JSON → Delta/Parquet(保留原始状态) │ └──────────────────────┬────────────────────────────────┘ 云对象存储(S3 / GCS / ADLS)

与 dbt 集成(dbt-spark)

dbt(data build tool)是现代数据转换的标准工具,用 SQL 定义转换逻辑,自动管理依赖关系和测试。dbt-spark 适配器让 dbt 直接在 Spark 上运行。

# profiles.yml:配置 Spark 连接
my_project:
  target: dev
  outputs:
    dev:
      type: spark
      method: thrift       # 通过 Spark Thrift Server 连接
      host: localhost
      port: 10001
      schema: dw
      cluster: dev_cluster

# 或连接 Databricks
    databricks:
      type: databricks
      host: my-workspace.azuredatabricks.net
      http_path: /sql/1.0/warehouses/xxx
      token: "{{ env_var('DATABRICKS_TOKEN') }}"
      schema: dw
-- models/gold/mart_daily_revenue.sql
{{ config(
    materialized='incremental',      -- 增量模式:只处理新增数据
    file_format='delta',
    incremental_strategy='merge',
    unique_key='order_date'
) }}

SELECT
    DATE(order_time)          AS order_date,
    category,
    COUNT(DISTINCT order_id)  AS order_cnt,
    SUM(amount)               AS gmv,
    COUNT(DISTINCT user_id)   AS uv
FROM {{ ref('silver_orders') }}   -- 引用 silver 层,自动解析依赖
{% if is_incremental() %}
WHERE DATE(order_time) >= DATE_SUB(current_date(), 3)  -- 增量:只处理近 3 天
{% endif %}
GROUP BY 1, 2

Spark + Kafka 实时数仓

# Lambda/Kappa 架构实现:Structured Streaming 写入 Delta Lake
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "orders") \
    .load()

# Bronze 层:原始数据落地 Delta,保留所有字段
raw_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://ck/bronze-orders") \
    .outputMode("append") \
    .trigger(processingTime="1 minute") \
    .start("s3://datalake/bronze/orders")

# Silver 层:流式清洗,foreachBatch 批量写入
def upsert_to_silver(batch_df, batch_id):
    batch_df = batch_df.dropDuplicates(["order_id"]).filter("amount > 0")
    DeltaTable.forPath(spark, "s3://datalake/silver/orders") \
        .alias("t").merge(batch_df.alias("s"), "t.order_id = s.order_id") \
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

parsed.writeStream.foreachBatch(upsert_to_silver) \
    .option("checkpointLocation", "s3://ck/silver-orders") \
    .start()

Spark vs Flink 选型

维度Apache SparkApache Flink
计算模型微批(Micro-batch),最低延迟约 100ms真正的事件驱动流,延迟可达毫秒级
批处理能力非常强(首选)支持,但不如 Spark 成熟
状态管理Watermark + 状态 API,较简单RocksDB 状态后端,更强大灵活
SQL 支持Spark SQL,成熟,与 Hive 兼容Flink SQL,持续完善中
机器学习MLlib,完整无内置 ML,依赖外部框架
社区/生态极其庞大,文档齐全成熟,尤其欧洲/阿里
选择场景批处理 / 准实时(>100ms 延迟可接受)/ ML超低延迟流处理 / 复杂状态计算

PySpark 最佳实践

代码组织

# 推荐项目结构
# my_spark_project/
# ├── jobs/             # Spark 作业入口
# │   ├── daily_etl.py
# │   └── user_report.py
# ├── transforms/       # 可复用的转换函数
# │   ├── cleaning.py
# │   └── aggregations.py
# ├── tests/            # 单元测试
# └── conf/             # 配置文件

# transforms/cleaning.py
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def clean_orders(df: DataFrame) -> DataFrame:
    """清洗订单数据:去除无效行,标准化字段"""
    return df \
        .filter(F.col("order_id").isNotNull()) \
        .filter(F.col("amount") > 0) \
        .withColumn("date", F.to_date("order_time")) \
        .withColumn("amount", F.round("amount", 2))

单元测试

# tests/test_cleaning.py
import pytest
from pyspark.sql import SparkSession
from transforms.cleaning import clean_orders

@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder \
        .master("local[2]") \
        .appName("pytest-spark") \
        .getOrCreate()

def test_clean_orders_removes_nulls(spark):
    raw = spark.createDataFrame([
        (1,   "2024-01-01 10:00:00", 100.0),
        (None, "2024-01-01 11:00:00", 50.0),   # 应被过滤
        (2,   "2024-01-01 12:00:00", -10.0),  # 负金额,应被过滤
    ], ["order_id", "order_time", "amount"])

    result = clean_orders(raw)
    assert result.count() == 1
    assert result.first()["order_id"] == 1

PySpark 开发的黄金法则:避免在 DataFrame 操作中使用 Python UDF(User Defined Function)——Python UDF 每行都需要在 Python 解释器和 JVM 之间序列化,性能极差。优先使用 pyspark.sql.functions 内置函数;复杂逻辑考虑 Pandas UDF(Arrow 向量化传输);最后才考虑普通 Python UDF。

课程总结:Apache Spark 已从大数据批处理框架演化为统一的数据处理平台。掌握本教程的核心知识点:DataFrame API(日常数据操作)、Spark SQL(SQL 分析)、Structured Streaming(实时计算)、性能调优(Spark UI + AQE + 广播连接)、Delta Lake(可靠数据湖),可以应对绝大多数大数据工程场景。