MapReduce 的问题
在 Spark 出现之前,Hadoop MapReduce 是大数据处理的事实标准。MapReduce 的核心思想是将计算拆分为 Map(映射)和 Reduce(归约)两个阶段,中间结果必须写入 HDFS 磁盘。
MapReduce 的三大痛点
- 磁盘 I/O 开销巨大:每个 Map/Reduce 阶段的中间结果都要写磁盘,迭代算法(如机器学习的梯度下降)需要数百轮迭代,产生海量磁盘读写
- 不支持迭代计算:两阶段固定模型无法自然表达图算法、机器学习等需要反复迭代的计算
- 编程模型繁琐:一个简单的 WordCount 需要数十行 Java 代码,复杂查询往往需要多个 MapReduce Job 串行执行
MapReduce vs Spark 的中间数据处理对比
MapReduce(每步都写磁盘):
Input → Map → [HDFS写] → Shuffle → Reduce → [HDFS写] → Map → [HDFS写] → ...
Spark(内存缓存,按需持久化):
Input → Transform → [内存RDD] → Transform → [内存RDD] → Action → Output
迭代1 迭代2 迭代3 (只有最终结果写磁盘)
Spark 的诞生与核心思想
Apache Spark 由 Matei Zaharia 在 UC Berkeley AMPLab 于 2009 年创建,2013 年捐献给 Apache 基金会。Spark 的核心创新是 弹性分布式数据集(RDD):将数据集以分区形式分布在集群内存中,避免重复的磁盘读写,使迭代算法性能提升 10~100 倍。
Spark 的核心理念:把计算带到数据所在节点(移动计算而非移动数据),在内存中缓存中间结果,用 DAG 调度引擎优化整个计算流程。
核心架构:Driver / Executor / Cluster Manager
Spark 集群架构总览
┌─────────────────────────────────────────────────────────────┐
│ Driver Program │
│ SparkContext / SparkSession │
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ DAG Scheduler│ │Task Scheduler│ │ Block Manager │ │
│ └──────┬───────┘ └──────┬───────┘ └─────────────────┘ │
└──────────┼─────────────────┼──────────────────────────────┘
│ 申请资源 │ 分发 Task
▼ ▼
┌──────────────────┐ ┌─────────────────────────────────────┐
│ Cluster Manager │ │ Worker Node │
│ (YARN/K8s/ │ │ ┌──────────┐ ┌──────────────┐ │
│ Standalone) │ │ │ Executor │ │ Executor │ │
│ │ │ │ Task Task│ │ Task Task │ │
│ │ │ │ [内存缓存]│ │ [内存缓存] │ │
└──────────────────┘ │ └──────────┘ └──────────────┘ │
└─────────────────────────────────────┘
- Driver Spark 应用的主进程,运行用户的 main() 函数。负责将用户代码转换为 DAG 计划、向 Cluster Manager 申请资源、将 Task 分发给 Executor,并收集结果。Driver 挂掉整个 Application 失败。
- Executor 运行在 Worker 节点上的 JVM 进程,负责执行 Task 并将中间结果缓存在内存(或磁盘)中。每个 Spark Application 独享一组 Executor,Application 结束后 Executor 销毁。
- Cluster Manager 集群资源管理器,负责分配 CPU/内存资源给 Driver 和 Executor。支持 Standalone(内置)、Apache YARN(Hadoop 生态)、Kubernetes(现代云原生)、Apache Mesos(已逐渐淘汰)。
-
SparkSession
Spark 2.0+ 统一的入口点,整合了旧版的 SparkContext、SQLContext、HiveContext。通过
SparkSession.builder创建,一个应用通常只创建一个实例。
执行模型:DAG / Stage / Task / Shuffle
Spark 将用户的转换操作构建成一个 有向无环图(DAG),再由调度器将其分解为 Stage 和 Task 提交给 Executor 执行。
名词解释
- Job 由一个 Action 操作触发的一次完整计算。一个 Spark 应用可以产生多个 Job。
- Stage Job 被 Shuffle 边界切分成若干 Stage。同一个 Stage 内的所有 Task 可以流水线并行执行,不需要等待其他 Stage 完成。
- Task Stage 内针对一个数据分区的最小执行单元。一个 Stage 有多少个分区,就会产生多少个并行 Task。
- Shuffle 跨节点的数据重新分区操作,是 Spark 中最昂贵的操作(涉及磁盘写入和网络传输)。groupBy、join、repartition 等都会触发 Shuffle,形成 Stage 边界。
- 惰性求值 Transformation 操作(map/filter/select 等)只是记录计划,不立即执行。只有当 Action 操作(collect/count/save 等)被调用时,Spark 才真正执行 DAG 中的所有计算。这使得 Spark 有机会对整个计划做全局优化。
- 宽依赖 vs 窄依赖 窄依赖(Narrow Dependency):父 RDD 的每个分区只被子 RDD 的一个分区使用,如 map/filter,可以流水线执行。宽依赖(Wide Dependency):父 RDD 的一个分区被子 RDD 的多个分区使用,必须触发 Shuffle,如 groupByKey/reduceByKey/join。
RDD vs DataFrame vs Dataset 演进
Spark 经过多年发展,形成了三代 API,从底层到高层逐步封装:
| API | 引入版本 | 类型安全 | 性能优化 | 推荐场景 |
|---|---|---|---|---|
| RDD | Spark 1.0 | 编译期安全(Scala) | 无 Catalyst/Tungsten 优化 | 底层控制、自定义序列化 |
| DataFrame | Spark 1.3 | 运行期检查 | Catalyst 优化器 + Tungsten 内存管理 | 数据工程、SQL 分析(推荐) |
| Dataset | Spark 1.6 | 编译期安全(Scala/Java) | 同 DataFrame(本质是类型化 DataFrame) | Scala/Java 强类型业务代码 |
Python 用户注意:PySpark 只有 DataFrame API(Python 没有编译期类型),Dataset API 仅在 Scala/Java 中有意义。绝大多数数据工程师使用 DataFrame API 就够了。
部署模式
| 模式 | 适用场景 | 特点 |
|---|---|---|
| Local[n] | 本地开发/测试 | 单机运行,n 为线程数,local[*] 使用全部 CPU |
| Standalone | 小型私有集群 | Spark 自带的轻量级 Cluster Manager,无需 Hadoop |
| YARN | Hadoop 生态 | 与 HDFS 深度集成,企业最常见的部署方式 |
| Kubernetes | 云原生/容器化 | Spark 3.1+ 生产就绪,弹性伸缩,现代首选 |
| Databricks | 托管云平台 | 全托管 Spark,Delta Lake 原生支持,无需运维 |
安装 PySpark + JupyterLab
# 前置条件:Java 11 或 17(Spark 3.3+ 推荐)
java -version # 确认已安装
# 安装 PySpark 和 JupyterLab
pip install pyspark==3.5.1 jupyterlab findspark
# 启动 JupyterLab
jupyter lab
# 或者用 Docker(最简单,无需本地 Java)
docker run -it --rm \
-p 8888:8888 \
-p 4040:4040 \
jupyter/pyspark-notebook:spark-3.5.0
# 第一个 PySpark 程序
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HelloSpark") \
.master("local[*]") \
.getOrCreate()
# Spark UI 运行在 http://localhost:4040
print(f"Spark 版本: {spark.version}")
# 创建一个简单的 DataFrame
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.show()
# +-------+---+
# | name|age|
# +-------+---+
# | Alice| 30|
# | Bob| 25|
# |Charlie| 35|
# +-------+---+
spark.stop()
Spark UI(端口 4040):每个 Spark 应用运行时会启动 Web UI,可以实时查看 Job、Stage、Task 的执行状态、数据量、耗时分布。调优时这是最重要的工具,务必养成查看 Spark UI 的习惯。