Chapter 06

Spark Streaming 结构化流

用批处理的思维处理流数据——Structured Streaming 让实时计算像写 SQL 一样简单

Structured Streaming vs DStream(旧)

Spark 2.0 推出了 Structured Streaming,以替代 Spark 1.x 的 DStream API。两者在编程模型上有根本性差异:

特性DStream(旧,已废弃)Structured Streaming(推荐)
抽象模型离散化微批 RDD 序列无界的流式 DataFrame/Table
APIRDD 操作,低级DataFrame/SQL,与批处理相同
端到端一致性至少一次(At-Least-Once)精确一次(Exactly-Once)
事件时间处理不原生支持原生支持,Watermark 处理延迟
状态管理updateStateByKey,复杂且有限mapGroupsWithState,更灵活
维护状态已废弃,不再维护活跃开发中

不要使用 DStream:DStream API 在 Spark 3.4+ 已标记为废弃(Deprecated),新项目一律使用 Structured Streaming。

流 DataFrame 抽象

Structured Streaming 的核心思想是:将流数据视为一张不断追加行的无界表(Unbounded Table)。每次触发时,处理新增的数据,将结果写到输出表。

Structured Streaming 的无界表模型 输入流(持续追加行): ┌────────────────────────────────────────────┐ │ t=0: [event1, event2, event3] │ │ t=1: [event4, event5] ← 新增 │ │ t=2: [event6, event7, event8] ← 新增 │ │ ... │ └─────────────────────┬──────────────────────┘ │ 每个触发间隔处理新增数据 ┌────────────────────────────────────────────┐ │ 查询逻辑(与批处理 DataFrame 完全相同) │ │ df.filter().groupBy().agg() │ └─────────────────────┬──────────────────────┘ 输出到 Sink(Kafka/文件/内存)

从 Kafka 读取流数据

# 创建流 DataFrame(读取 Kafka)
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \   # 从最新消息开始
    .option("maxOffsetsPerTrigger", "10000") \  # 每批最多处理10000条
    .load()

# 解析消息
event_schema = StructType([
    StructField("user_id",  LongType(),      True),
    StructField("event",    StringType(),    True),
    StructField("amount",   DoubleType(),    True),
    StructField("ts",       TimestampType(), True),
])

parsed = stream_df.select(
    F.from_json(F.col("value").cast("string"), event_schema).alias("data")
).select("data.*")

触发器(Trigger)

触发器决定何时开始处理新数据:

输出模式(Output Mode)

Watermark:处理延迟数据

现实中,数据经常因网络延迟等原因迟到。Watermark(水印)机制让 Spark 知道可以安全地丢弃多久以前的延迟数据,从而控制状态大小。

# Watermark:允许最多 10 分钟的延迟数据
windowed_counts = parsed \
    .withWatermark("ts", "10 minutes") \     # ts 是事件时间列,最大延迟 10 分钟
    .groupBy(
        F.window(F.col("ts"), "5 minutes"),  # 5 分钟滚动窗口
        F.col("event")
    ) \
    .agg(F.count("*").alias("cnt"))

# 去重:基于事件时间的 dropDuplicates(避免 Kafka 重试造成重复)
deduped = parsed \
    .withWatermark("ts", "1 hour") \
    .dropDuplicates(["event_id", "ts"])

实战:Kafka 实时日志分析

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.appName("RealtimeLogAnalysis").getOrCreate()

# 1. 读取 Kafka 流
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "nginx-access-log") \
    .load()

log_schema = StructType([
    StructField("ip",           StringType(),    True),
    StructField("method",       StringType(),    True),
    StructField("url",          StringType(),    True),
    StructField("status_code",  IntegerType(),   True),
    StructField("response_ms",  LongType(),      True),
    StructField("ts",           TimestampType(), True),
])

# 2. 解析 JSON 日志
logs = raw_stream.select(
    F.from_json(F.col("value").cast("string"), log_schema).alias("log")
).select("log.*")

# 3. 每分钟 HTTP 状态码统计
status_counts = logs \
    .withWatermark("ts", "2 minutes") \
    .groupBy(
        F.window("ts", "1 minute"),
        "status_code"
    ) \
    .agg(
        F.count("*").alias("req_count"),
        F.avg("response_ms").alias("avg_latency_ms"),
    )

# 4. 写入 Kafka(输出每分钟统计结果)
query = status_counts \
    .select(
        F.to_json(F.struct("*")).alias("value")
    ) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "log-stats") \
    .option("checkpointLocation", "s3://bucket/checkpoints/log-stats") \
    .outputMode("update") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()  # 阻塞,持续运行直到手动停止

Checkpoint 的重要性checkpointLocation 存储流查询的状态和进度(Kafka offset)。如果作业重启,Spark 会从 checkpoint 恢复,不会重复处理或遗漏数据。生产环境必须配置,且推荐存储在 HDFS 或 S3 等可靠存储上。