Chapter 07

性能调优

从 Spark UI 读懂执行计划,解决数据倾斜,利用 AQE 让 Spark 自我优化

理解 Spark UI

Spark UI(默认端口 4040)是调优最重要的工具。应用运行时通过浏览器访问,可以看到每个 Job、Stage、Task 的详细执行信息。

Spark UI 关键页面

数据倾斜:定位与解决

数据倾斜(Data Skew)是 Spark 最常见的性能问题。当某些 Key 的数据量远大于其他 Key 时,处理这些 Key 的 Task 会成为整个 Job 的瓶颈(Stage 要等最慢的 Task 完成)。

如何定位

在 Spark UI 的 Stage 详情中,查看 Task 的 Duration 分布。如果绝大多数 Task 在几秒内完成,但有 1~2 个 Task 需要数十分钟,就是典型的数据倾斜。

# 快速定位倾斜 Key
df.groupBy("join_key").count().orderBy(F.desc("count")).show(20)
# 如果 top 1 的 Key 数量是 top 2 的 100 倍,就是严重倾斜

解决方案一:加盐(Salting)

import random
from pyspark.sql import functions as F

SALT = 20  # 盐值数量,将热点 Key 分散到 20 个分区

# 大表:随机添加盐值(0-19)
df_large_salted = df_large.withColumn(
    "join_key_salted",
    F.concat(F.col("join_key"), F.lit("_"), (F.rand() * SALT).cast("int"))
)

# 小表:复制 SALT 份(每个盐值一份)
df_small_expanded = df_small.crossJoin(
    spark.range(SALT).withColumnRenamed("id", "salt")
).withColumn(
    "join_key_salted",
    F.concat(F.col("join_key"), F.lit("_"), F.col("salt"))
)

# 用加盐后的 Key 执行 Join
result = df_large_salted.join(df_small_expanded, "join_key_salted").drop("join_key_salted")

解决方案二:repartition / coalesce

# 重新分区(触发 Shuffle,可以增加或减少分区数)
df.repartition(200)                    # 随机重分区到 200 个分区
df.repartition(200, F.col("user_id")) # 按 user_id 列哈希分区(相同 key 在同一分区)

# coalesce:只能减少分区,不触发 Shuffle(合并现有分区)
df.coalesce(10)  # 写小文件前用于合并分区

广播连接 broadcast()

当 Join 的一侧是小表(通常 < 10MB,最多建议不超过 200MB),可以将小表广播到所有 Executor,避免大表 Shuffle。

# 广播 Join:显式广播(推荐,明确意图)
result = df_large.join(
    F.broadcast(df_small),
    on="category_id",
    how="left"
)

# 配置自动广播阈值(默认 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")  # 100MB 以下自动广播
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")    # 禁用自动广播

持久化:cache() / persist()

当一个 DataFrame 被多次使用(如先聚合再 Join 再过滤),应将其持久化(缓存),避免重复计算。

存储级别说明适用场景
MEMORY_ONLY只在内存(Java对象)数据量小,内存足够
MEMORY_AND_DISK内存不足时溢出到磁盘默认推荐(cache()的默认级别)
MEMORY_ONLY_SER序列化后存内存,节省空间内存紧张但 CPU 充裕
DISK_ONLY只存磁盘数据极大,访问频率低
OFF_HEAP堆外内存避免 GC 影响
from pyspark.storagelevel import StorageLevel

df.cache()                                    # 等价于 persist(MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)   # 序列化存储(节省内存)
df.unpersist()                                 # 释放缓存(用完后记得释放)

# cache 在触发 Action 后才真正写入内存
df_cached = df.filter(...).cache()
df_cached.count()  # 触发缓存

AQE:自适应查询优化(Spark 3.x)

AQE(Adaptive Query Execution)是 Spark 3.0 引入的重要特性,让 Spark 在运行时根据真实统计数据动态调整执行计划,无需手动调优。

# AQE 配置(Spark 3.x 默认开启)
spark.conf.set("spark.sql.adaptive.enabled", "true")                   # 开启 AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") # 动态合并分区
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")           # 自动处理倾斜 Join
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

内存配置

# spark-submit 关键内存参数
spark-submit \
  --driver-memory 8g \           # Driver 内存(collect 大量数据时需要增大)
  --executor-memory 16g \        # 每个 Executor 内存
  --executor-cores 4 \           # 每个 Executor 的 CPU 核数
  --num-executors 20 \           # Executor 数量(YARN 模式)
  --conf spark.memory.fraction=0.8 \       # JVM 堆中用于 Spark 的比例(默认 0.6)
  --conf spark.memory.storageFraction=0.5 \ # Spark 内存中用于缓存的比例
  --conf spark.sql.shuffle.partitions=400 \ # Shuffle 后的分区数(根据数据量调整)
  app.py

调优黄金法则:先用 Spark UI 定位瓶颈(慢在哪个 Stage/Task),再针对性解决。不要盲目调大内存——很多性能问题是数据倾斜或分区数不合理导致的,加内存无济于事。