Chapter 06

Kafka Streams 流处理

嵌入式流处理引擎——无需独立集群,在应用内实现实时数据转换与聚合

Kafka Streams 简介

Kafka Streams 是 Apache Kafka 内置的嵌入式流处理库(Java/Scala)。与 Flink/Spark 等独立流处理集群不同,Kafka Streams 作为普通 Java 依赖运行在业务应用内,无需部署额外基础设施。

Kafka Streams 优势

无独立集群、弹性扩容(部署多个实例即可)、精确一次语义、状态自动容错恢复

🚀

适用场景

实时数据转换、聚合统计、流式 Join、状态机更新,单 Topic 消费处理再输出

核心抽象:KStream 与 KTable

KStream vs KTable 语义差异 输入 Topic(按时间顺序): [k1:v1] [k2:v2] [k1:v3] [k3:v4] [k1:v5] KStream 视角(每条消息独立): (k1,v1) → (k2,v2) → (k1,v3) → (k3,v4) → (k1,v5) ↑ k1 出现了3次,互相独立 KTable 视角(每 Key 取最新值): 最终状态: { k1: v5, k2: v2, k3: v4 } ↑ k1 只保留最新的 v5

常用操作算子

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> orders =
    builder.stream("orders", Consumed.with(Serdes.String(), Serdes.String()));

// ── filter:过滤金额大于 100 的订单 ──
KStream<String, String> bigOrders = orders
    .filter((key, value) -> {
        Order order = Order.parse(value);
        return order.getAmount() > 100.0;
    });

// ── map:提取用户 ID 作为新 Key ──
KStream<String, Double> userOrders = orders
    .map((orderId, value) -> {
        Order order = Order.parse(value);
        return KeyValue.pair(order.getUserId(), order.getAmount());
    });

// ── flatMap:一条消息拆分成多条(如订单包含多个商品)──
KStream<String, String> items = orders
    .flatMap((key, value) -> {
        Order order = Order.parse(value);
        return order.getItems().stream()
            .map(item -> KeyValue.pair(item.getSkuId(), item.toString()))
            .collect(Collectors.toList());
    });

// ── 分支(split):按条件路由到不同 Topic ──
Map<String, KStream<String, String>> branches = orders
    .split(Named.as("branch-"))
    .branch((k, v) -> Order.parse(v).isVip(), Branched.as("vip"))
    .branch((k, v) -> true,                   Branched.as("normal"))
    .defaultBranch();

branches.get("branch-vip").to("vip-orders");
branches.get("branch-normal").to("normal-orders");

状态聚合(Stateful Aggregation)

// 按用户 ID 统计总消费金额(KTable 聚合)
KTable<String, Double> userTotalSpend = userOrders
    .groupByKey(Grouped.with(Serdes.String(), Serdes.Double()))
    .aggregate(
        () -> 0.0,                          // 初始值
        (userId, amount, total) -> total + amount, // 聚合函数
        Materialized.<String, Double>as("user-spend-store") // 状态存储名称
            .withValueSerde(Serdes.Double())
    );

// 输出到 Topic
userTotalSpend.toStream().to("user-spend-summary");

// 查询本地状态存储(Interactive Queries)
ReadOnlyKeyValueStore<String, Double> store =
    streams.store(StoreQueryParameters.fromNameAndType(
        "user-spend-store", QueryableStoreTypes.keyValueStore()));

Double total = store.get("user-123"); // 实时查询某用户总消费

窗口操作

// ── Tumbling Window(滚动窗口):不重叠的固定时间窗口 ──
KTable<Windowed<String>, Long> orderCount1Min = orders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count();

// ── Hopping Window(跳跃窗口):重叠的固定时间窗口 ──
// 每30秒统计一次过去5分钟的数据
KTable<Windowed<String>, Double> rollingRevenue = userOrders
    .groupByKey(Grouped.with(Serdes.String(), Serdes.Double()))
    .windowedBy(TimeWindows
        .ofSizeWithNoGrace(Duration.ofMinutes(5))
        .advanceBy(Duration.ofSeconds(30)))
    .reduce(Double::sum);

// ── Session Window(会话窗口):活跃间隔超过阈值则结束会话 ──
KTable<Windowed<String>, Long> sessions = orders
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count();
// 用户 30 分钟无操作则结束当前 session,统计 session 内操作次数
三种窗口类型对比(时间轴) Tumbling Window(每1分钟): |─────W1─────|─────W2─────|─────W3─────| 0 60s 120s 180s Hopping Window(5分钟宽度,30秒步长): |───────────────W1───────────────| |───────────────W2───────────────| |───────────────W3───────────────| Session Window(30分钟无活动则结束): [e1 e2 e3]──gap≥30m──[e4 e5]──gap≥30m──[e6] ───S1──── ──S2── S3

实战:实时统计每分钟订单金额

// 完整示例:每分钟聚合一次各品类的订单总额,输出到 order-revenue-1min

Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "revenue-aggregator");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// RocksDB 状态存储目录
streamsProps.put(StreamsConfig.STATE_DIR_CONFIG, "/data/kafka-streams");

StreamsBuilder builder = new StreamsBuilder();

builder
    .stream<String, String>("orders")
    // 提取 category 作为新 Key,amount 作为 Value
    .map((k, v) -> {
        Order o = Order.fromJson(v);
        return KeyValue.pair(o.getCategory(), o.getAmount().toString());
    })
    .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .aggregate(
        () -> "0.0",
        (cat, amtStr, total) -> String.valueOf(Double.parseDouble(total) + Double.parseDouble(amtStr))
    )
    .toStream()
    // 窗口 Key 转换为可读字符串:category@windowStart-windowEnd
    .map((wk, v) -> KeyValue.pair(
        wk.key() + "@" + wk.window().startTime() + "-" + wk.window().endTime(),
        v
    ))
    .to("order-revenue-1min");

KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
streams.start();

// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() ->
    streams.close(Duration.ofSeconds(5))
));

状态容错恢复:Kafka Streams 的 RocksDB 状态存储会定期将增量变更备份到 Kafka 内部的 changelog Topic(application-id-store-name-changelog)。节点故障重启后,Streams 会自动从 changelog 恢复状态,通常在秒级内完成恢复,无需外部数据库支持。