理解 Spark UI
Spark UI(默认端口 4040)是调优最重要的工具。应用运行时通过浏览器访问,可以看到每个 Job、Stage、Task 的详细执行信息。
Spark UI 关键页面
- Jobs 页面:列出所有 Job,每个 Job 对应一个 Action。查看哪些 Job 耗时最长
- Stages 页面:Job 内的 Stage 列表,查看 Stage 之间的数据量和 Shuffle Read/Write 大小
- Tasks 视图(Stage 详情):最关键。查看每个 Task 的执行时间分布——如果某个 Task 明显慢于其他 Task,说明存在数据倾斜
- SQL 页面:显示 DataFrame/SQL 的物理执行计划(Physical Plan),可以看到 Spark 选择的 Join 策略、是否应用了谓词下推
- Storage 页面:查看已缓存的 RDD/DataFrame 的内存使用情况
- Environment 页面:查看所有配置参数的实际值
数据倾斜:定位与解决
数据倾斜(Data Skew)是 Spark 最常见的性能问题。当某些 Key 的数据量远大于其他 Key 时,处理这些 Key 的 Task 会成为整个 Job 的瓶颈(Stage 要等最慢的 Task 完成)。
如何定位
在 Spark UI 的 Stage 详情中,查看 Task 的 Duration 分布。如果绝大多数 Task 在几秒内完成,但有 1~2 个 Task 需要数十分钟,就是典型的数据倾斜。
# 快速定位倾斜 Key
df.groupBy("join_key").count().orderBy(F.desc("count")).show(20)
# 如果 top 1 的 Key 数量是 top 2 的 100 倍,就是严重倾斜
解决方案一:加盐(Salting)
import random
from pyspark.sql import functions as F
SALT = 20 # 盐值数量,将热点 Key 分散到 20 个分区
# 大表:随机添加盐值(0-19)
df_large_salted = df_large.withColumn(
"join_key_salted",
F.concat(F.col("join_key"), F.lit("_"), (F.rand() * SALT).cast("int"))
)
# 小表:复制 SALT 份(每个盐值一份)
df_small_expanded = df_small.crossJoin(
spark.range(SALT).withColumnRenamed("id", "salt")
).withColumn(
"join_key_salted",
F.concat(F.col("join_key"), F.lit("_"), F.col("salt"))
)
# 用加盐后的 Key 执行 Join
result = df_large_salted.join(df_small_expanded, "join_key_salted").drop("join_key_salted")
解决方案二:repartition / coalesce
# 重新分区(触发 Shuffle,可以增加或减少分区数)
df.repartition(200) # 随机重分区到 200 个分区
df.repartition(200, F.col("user_id")) # 按 user_id 列哈希分区(相同 key 在同一分区)
# coalesce:只能减少分区,不触发 Shuffle(合并现有分区)
df.coalesce(10) # 写小文件前用于合并分区
广播连接 broadcast()
当 Join 的一侧是小表(通常 < 10MB,最多建议不超过 200MB),可以将小表广播到所有 Executor,避免大表 Shuffle。
# 广播 Join:显式广播(推荐,明确意图)
result = df_large.join(
F.broadcast(df_small),
on="category_id",
how="left"
)
# 配置自动广播阈值(默认 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m") # 100MB 以下自动广播
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") # 禁用自动广播
持久化:cache() / persist()
当一个 DataFrame 被多次使用(如先聚合再 Join 再过滤),应将其持久化(缓存),避免重复计算。
| 存储级别 | 说明 | 适用场景 |
|---|---|---|
| MEMORY_ONLY | 只在内存(Java对象) | 数据量小,内存足够 |
| MEMORY_AND_DISK | 内存不足时溢出到磁盘 | 默认推荐(cache()的默认级别) |
| MEMORY_ONLY_SER | 序列化后存内存,节省空间 | 内存紧张但 CPU 充裕 |
| DISK_ONLY | 只存磁盘 | 数据极大,访问频率低 |
| OFF_HEAP | 堆外内存 | 避免 GC 影响 |
from pyspark.storagelevel import StorageLevel
df.cache() # 等价于 persist(MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # 序列化存储(节省内存)
df.unpersist() # 释放缓存(用完后记得释放)
# cache 在触发 Action 后才真正写入内存
df_cached = df.filter(...).cache()
df_cached.count() # 触发缓存
AQE:自适应查询优化(Spark 3.x)
AQE(Adaptive Query Execution)是 Spark 3.0 引入的重要特性,让 Spark 在运行时根据真实统计数据动态调整执行计划,无需手动调优。
- 动态合并 Shuffle 分区:Shuffle 后如果某些分区数据量很小,AQE 自动将它们合并,减少小分区 Task 的调度开销
- 动态切换 Join 策略:如果 Join 的一侧实际数据量比估算小(触发广播阈值),AQE 自动切换为 Broadcast Hash Join
- 动态优化数据倾斜:自动检测 Join 中的倾斜分区,并将其拆分为多个小分区处理
# AQE 配置(Spark 3.x 默认开启)
spark.conf.set("spark.sql.adaptive.enabled", "true") # 开启 AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") # 动态合并分区
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") # 自动处理倾斜 Join
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
内存配置
# spark-submit 关键内存参数
spark-submit \
--driver-memory 8g \ # Driver 内存(collect 大量数据时需要增大)
--executor-memory 16g \ # 每个 Executor 内存
--executor-cores 4 \ # 每个 Executor 的 CPU 核数
--num-executors 20 \ # Executor 数量(YARN 模式)
--conf spark.memory.fraction=0.8 \ # JVM 堆中用于 Spark 的比例(默认 0.6)
--conf spark.memory.storageFraction=0.5 \ # Spark 内存中用于缓存的比例
--conf spark.sql.shuffle.partitions=400 \ # Shuffle 后的分区数(根据数据量调整)
app.py
调优黄金法则:先用 Spark UI 定位瓶颈(慢在哪个 Stage/Task),再针对性解决。不要盲目调大内存——很多性能问题是数据倾斜或分区数不合理导致的,加内存无济于事。