Delta Lake:ACID 事务与时间旅行
Delta Lake 是建立在 Parquet 文件之上的开源存储层,通过 事务日志(_delta_log/)为数据湖带来数据库级别的可靠性保证。
核心特性
- ACID 事务 并发读写不会看到部分写入的数据。多个 Spark 作业可以同时写入不同分区,互不干扰,不会产生数据损坏。
- 时间旅行 Delta 保存历史版本的事务日志,可以查询任意历史时刻的数据(默认保留 30 天)。用于数据审计、回滚误操作、可复现的数据分析。
- Schema 强制 写入时自动校验 Schema,不符合的数据会被拒绝(而非静默写入错误数据)。
-
Schema 演化
支持向后兼容的 Schema 变更(新增列)。
mergeSchema选项允许写入时自动合并新列。 - OPTIMIZE + ZORDER 合并小文件(OPTIMIZE)并按常用过滤列排序数据(ZORDER BY),大幅提升查询性能。
from delta.tables import DeltaTable
# 写入 Delta 表
df.write.format("delta").mode("overwrite").save("/data/orders")
# 时间旅行:按版本查询
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("/data/orders")
# 时间旅行:按时间戳查询
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15 00:00:00") \
.load("/data/orders")
# 查看历史
delta_table = DeltaTable.forPath(spark, "/data/orders")
delta_table.history(10).show()
# 合并优化(压缩小文件并按 user_id 排序)
spark.sql("OPTIMIZE delta.`/data/orders` ZORDER BY (user_id, order_date)")
# 清理旧版本(释放存储空间)
delta_table.vacuum(retentionHours=168) # 保留最近 7 天的历史
Lakehouse 架构
Lakehouse 是结合了数据仓库(可靠性、ACID、Schema)和数据湖(低成本、灵活、多格式)优点的新型架构范式,由 Databricks 于 2020 年提出。
Lakehouse 分层架构
┌───────────────────────────────────────────────────────┐
│ 消费层 │
│ BI 工具 │ ML 模型 │ 实时 API │ Ad-hoc SQL │
└──────────────────────┬────────────────────────────────┘
▼
┌───────────────────────────────────────────────────────┐
│ Gold 层(业务聚合表) │
│ fact_orders │ dim_users │ mart_daily_revenue │
├───────────────────────────────────────────────────────┤
│ Silver 层(清洗/标准化) │
│ 去重、类型转换、异常过滤、关联维表 │
├───────────────────────────────────────────────────────┤
│ Bronze 层(原始数据落地) │
│ Kafka → 原始 JSON → Delta/Parquet(保留原始状态) │
└──────────────────────┬────────────────────────────────┘
▼
云对象存储(S3 / GCS / ADLS)
与 dbt 集成(dbt-spark)
dbt(data build tool)是现代数据转换的标准工具,用 SQL 定义转换逻辑,自动管理依赖关系和测试。dbt-spark 适配器让 dbt 直接在 Spark 上运行。
# profiles.yml:配置 Spark 连接
my_project:
target: dev
outputs:
dev:
type: spark
method: thrift # 通过 Spark Thrift Server 连接
host: localhost
port: 10001
schema: dw
cluster: dev_cluster
# 或连接 Databricks
databricks:
type: databricks
host: my-workspace.azuredatabricks.net
http_path: /sql/1.0/warehouses/xxx
token: "{{ env_var('DATABRICKS_TOKEN') }}"
schema: dw
-- models/gold/mart_daily_revenue.sql
{{ config(
materialized='incremental', -- 增量模式:只处理新增数据
file_format='delta',
incremental_strategy='merge',
unique_key='order_date'
) }}
SELECT
DATE(order_time) AS order_date,
category,
COUNT(DISTINCT order_id) AS order_cnt,
SUM(amount) AS gmv,
COUNT(DISTINCT user_id) AS uv
FROM {{ ref('silver_orders') }} -- 引用 silver 层,自动解析依赖
{% if is_incremental() %}
WHERE DATE(order_time) >= DATE_SUB(current_date(), 3) -- 增量:只处理近 3 天
{% endif %}
GROUP BY 1, 2
Spark + Kafka 实时数仓
# Lambda/Kappa 架构实现:Structured Streaming 写入 Delta Lake
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "orders") \
.load()
# Bronze 层:原始数据落地 Delta,保留所有字段
raw_stream.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://ck/bronze-orders") \
.outputMode("append") \
.trigger(processingTime="1 minute") \
.start("s3://datalake/bronze/orders")
# Silver 层:流式清洗,foreachBatch 批量写入
def upsert_to_silver(batch_df, batch_id):
batch_df = batch_df.dropDuplicates(["order_id"]).filter("amount > 0")
DeltaTable.forPath(spark, "s3://datalake/silver/orders") \
.alias("t").merge(batch_df.alias("s"), "t.order_id = s.order_id") \
.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
parsed.writeStream.foreachBatch(upsert_to_silver) \
.option("checkpointLocation", "s3://ck/silver-orders") \
.start()
Spark vs Flink 选型
| 维度 | Apache Spark | Apache Flink |
|---|---|---|
| 计算模型 | 微批(Micro-batch),最低延迟约 100ms | 真正的事件驱动流,延迟可达毫秒级 |
| 批处理能力 | 非常强(首选) | 支持,但不如 Spark 成熟 |
| 状态管理 | Watermark + 状态 API,较简单 | RocksDB 状态后端,更强大灵活 |
| SQL 支持 | Spark SQL,成熟,与 Hive 兼容 | Flink SQL,持续完善中 |
| 机器学习 | MLlib,完整 | 无内置 ML,依赖外部框架 |
| 社区/生态 | 极其庞大,文档齐全 | 成熟,尤其欧洲/阿里 |
| 选择场景 | 批处理 / 准实时(>100ms 延迟可接受)/ ML | 超低延迟流处理 / 复杂状态计算 |
PySpark 最佳实践
代码组织
# 推荐项目结构
# my_spark_project/
# ├── jobs/ # Spark 作业入口
# │ ├── daily_etl.py
# │ └── user_report.py
# ├── transforms/ # 可复用的转换函数
# │ ├── cleaning.py
# │ └── aggregations.py
# ├── tests/ # 单元测试
# └── conf/ # 配置文件
# transforms/cleaning.py
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
def clean_orders(df: DataFrame) -> DataFrame:
"""清洗订单数据:去除无效行,标准化字段"""
return df \
.filter(F.col("order_id").isNotNull()) \
.filter(F.col("amount") > 0) \
.withColumn("date", F.to_date("order_time")) \
.withColumn("amount", F.round("amount", 2))
单元测试
# tests/test_cleaning.py
import pytest
from pyspark.sql import SparkSession
from transforms.cleaning import clean_orders
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder \
.master("local[2]") \
.appName("pytest-spark") \
.getOrCreate()
def test_clean_orders_removes_nulls(spark):
raw = spark.createDataFrame([
(1, "2024-01-01 10:00:00", 100.0),
(None, "2024-01-01 11:00:00", 50.0), # 应被过滤
(2, "2024-01-01 12:00:00", -10.0), # 负金额,应被过滤
], ["order_id", "order_time", "amount"])
result = clean_orders(raw)
assert result.count() == 1
assert result.first()["order_id"] == 1
PySpark 开发的黄金法则:避免在 DataFrame 操作中使用 Python UDF(User Defined Function)——Python UDF 每行都需要在 Python 解释器和 JVM 之间序列化,性能极差。优先使用 pyspark.sql.functions 内置函数;复杂逻辑考虑 Pandas UDF(Arrow 向量化传输);最后才考虑普通 Python UDF。
课程总结:Apache Spark 已从大数据批处理框架演化为统一的数据处理平台。掌握本教程的核心知识点:DataFrame API(日常数据操作)、Spark SQL(SQL 分析)、Structured Streaming(实时计算)、性能调优(Spark UI + AQE + 广播连接)、Delta Lake(可靠数据湖),可以应对绝大多数大数据工程场景。