Structured Streaming vs DStream(旧)
Spark 2.0 推出了 Structured Streaming,以替代 Spark 1.x 的 DStream API。两者在编程模型上有根本性差异:
| 特性 | DStream(旧,已废弃) | Structured Streaming(推荐) |
|---|---|---|
| 抽象模型 | 离散化微批 RDD 序列 | 无界的流式 DataFrame/Table |
| API | RDD 操作,低级 | DataFrame/SQL,与批处理相同 |
| 端到端一致性 | 至少一次(At-Least-Once) | 精确一次(Exactly-Once) |
| 事件时间处理 | 不原生支持 | 原生支持,Watermark 处理延迟 |
| 状态管理 | updateStateByKey,复杂且有限 | mapGroupsWithState,更灵活 |
| 维护状态 | 已废弃,不再维护 | 活跃开发中 |
不要使用 DStream:DStream API 在 Spark 3.4+ 已标记为废弃(Deprecated),新项目一律使用 Structured Streaming。
流 DataFrame 抽象
Structured Streaming 的核心思想是:将流数据视为一张不断追加行的无界表(Unbounded Table)。每次触发时,处理新增的数据,将结果写到输出表。
Structured Streaming 的无界表模型
输入流(持续追加行):
┌────────────────────────────────────────────┐
│ t=0: [event1, event2, event3] │
│ t=1: [event4, event5] ← 新增 │
│ t=2: [event6, event7, event8] ← 新增 │
│ ... │
└─────────────────────┬──────────────────────┘
│ 每个触发间隔处理新增数据
▼
┌────────────────────────────────────────────┐
│ 查询逻辑(与批处理 DataFrame 完全相同) │
│ df.filter().groupBy().agg() │
└─────────────────────┬──────────────────────┘
▼
输出到 Sink(Kafka/文件/内存)
从 Kafka 读取流数据
# 创建流 DataFrame(读取 Kafka)
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \ # 从最新消息开始
.option("maxOffsetsPerTrigger", "10000") \ # 每批最多处理10000条
.load()
# 解析消息
event_schema = StructType([
StructField("user_id", LongType(), True),
StructField("event", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("ts", TimestampType(), True),
])
parsed = stream_df.select(
F.from_json(F.col("value").cast("string"), event_schema).alias("data")
).select("data.*")
触发器(Trigger)
触发器决定何时开始处理新数据:
-
ProcessingTime
最常用。按固定时间间隔触发,如每 30 秒处理一批。
trigger(processingTime='30 seconds') -
Once
只处理一次当前所有可用数据,处理完后停止。适合定时批量任务场景。
trigger(once=True) -
AvailableNow
Spark 3.3+ 推出。处理所有当前可用数据(可能多批),处理完后停止。比 Once 更高效。
trigger(availableNow=True) -
Continuous
实验性功能。毫秒级延迟的连续处理模式,支持精确一次语义。
trigger(continuous='1 second')
输出模式(Output Mode)
- Append(默认) 只输出本批次新增的行,不会修改已输出的行。适合无聚合或 Watermark 聚合。写入 Kafka/文件最常用。
- Update 只输出本批次被更新(包括新增)的行。适合有聚合的场景(如实时计数),不支持排序。
- Complete 每次触发都输出全量结果表。适合聚合后结果行数少的场景(如全局 TOP N),内存开销大。
Watermark:处理延迟数据
现实中,数据经常因网络延迟等原因迟到。Watermark(水印)机制让 Spark 知道可以安全地丢弃多久以前的延迟数据,从而控制状态大小。
# Watermark:允许最多 10 分钟的延迟数据
windowed_counts = parsed \
.withWatermark("ts", "10 minutes") \ # ts 是事件时间列,最大延迟 10 分钟
.groupBy(
F.window(F.col("ts"), "5 minutes"), # 5 分钟滚动窗口
F.col("event")
) \
.agg(F.count("*").alias("cnt"))
# 去重:基于事件时间的 dropDuplicates(避免 Kafka 重试造成重复)
deduped = parsed \
.withWatermark("ts", "1 hour") \
.dropDuplicates(["event_id", "ts"])
实战:Kafka 实时日志分析
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
spark = SparkSession.builder.appName("RealtimeLogAnalysis").getOrCreate()
# 1. 读取 Kafka 流
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "nginx-access-log") \
.load()
log_schema = StructType([
StructField("ip", StringType(), True),
StructField("method", StringType(), True),
StructField("url", StringType(), True),
StructField("status_code", IntegerType(), True),
StructField("response_ms", LongType(), True),
StructField("ts", TimestampType(), True),
])
# 2. 解析 JSON 日志
logs = raw_stream.select(
F.from_json(F.col("value").cast("string"), log_schema).alias("log")
).select("log.*")
# 3. 每分钟 HTTP 状态码统计
status_counts = logs \
.withWatermark("ts", "2 minutes") \
.groupBy(
F.window("ts", "1 minute"),
"status_code"
) \
.agg(
F.count("*").alias("req_count"),
F.avg("response_ms").alias("avg_latency_ms"),
)
# 4. 写入 Kafka(输出每分钟统计结果)
query = status_counts \
.select(
F.to_json(F.struct("*")).alias("value")
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "log-stats") \
.option("checkpointLocation", "s3://bucket/checkpoints/log-stats") \
.outputMode("update") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination() # 阻塞,持续运行直到手动停止
Checkpoint 的重要性:checkpointLocation 存储流查询的状态和进度(Kafka offset)。如果作业重启,Spark 会从 checkpoint 恢复,不会重复处理或遗漏数据。生产环境必须配置,且推荐存储在 HDFS 或 S3 等可靠存储上。