Kafka Streams 简介
Kafka Streams 是 Apache Kafka 内置的嵌入式流处理库(Java/Scala)。与 Flink/Spark 等独立流处理集群不同,Kafka Streams 作为普通 Java 依赖运行在业务应用内,无需部署额外基础设施。
Kafka Streams 优势
无独立集群、弹性扩容(部署多个实例即可)、精确一次语义、状态自动容错恢复
适用场景
实时数据转换、聚合统计、流式 Join、状态机更新,单 Topic 消费处理再输出
核心抽象:KStream 与 KTable
- KStream 无界数据流,每条消息代表一个独立事件(INSERT 语义)。适合日志、点击事件等流式数据。同一个 Key 的多条消息独立存在。
- KTable 有界状态表,每条消息代表该 Key 的最新值(UPDATE/UPSERT 语义)。适合用户最新配置、商品库存等状态数据。新消息覆盖相同 Key 的旧消息。
- GlobalKTable 与 KTable 类似,但所有 Streams 实例都加载完整数据(而非仅自己负责的分区)。适合小型维度表(城市、品类等),支持非 Key Join。
KStream vs KTable 语义差异
输入 Topic(按时间顺序):
[k1:v1] [k2:v2] [k1:v3] [k3:v4] [k1:v5]
KStream 视角(每条消息独立):
(k1,v1) → (k2,v2) → (k1,v3) → (k3,v4) → (k1,v5)
↑ k1 出现了3次,互相独立
KTable 视角(每 Key 取最新值):
最终状态: { k1: v5, k2: v2, k3: v4 }
↑ k1 只保留最新的 v5
常用操作算子
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orders =
builder.stream("orders", Consumed.with(Serdes.String(), Serdes.String()));
// ── filter:过滤金额大于 100 的订单 ──
KStream<String, String> bigOrders = orders
.filter((key, value) -> {
Order order = Order.parse(value);
return order.getAmount() > 100.0;
});
// ── map:提取用户 ID 作为新 Key ──
KStream<String, Double> userOrders = orders
.map((orderId, value) -> {
Order order = Order.parse(value);
return KeyValue.pair(order.getUserId(), order.getAmount());
});
// ── flatMap:一条消息拆分成多条(如订单包含多个商品)──
KStream<String, String> items = orders
.flatMap((key, value) -> {
Order order = Order.parse(value);
return order.getItems().stream()
.map(item -> KeyValue.pair(item.getSkuId(), item.toString()))
.collect(Collectors.toList());
});
// ── 分支(split):按条件路由到不同 Topic ──
Map<String, KStream<String, String>> branches = orders
.split(Named.as("branch-"))
.branch((k, v) -> Order.parse(v).isVip(), Branched.as("vip"))
.branch((k, v) -> true, Branched.as("normal"))
.defaultBranch();
branches.get("branch-vip").to("vip-orders");
branches.get("branch-normal").to("normal-orders");
状态聚合(Stateful Aggregation)
// 按用户 ID 统计总消费金额(KTable 聚合)
KTable<String, Double> userTotalSpend = userOrders
.groupByKey(Grouped.with(Serdes.String(), Serdes.Double()))
.aggregate(
() -> 0.0, // 初始值
(userId, amount, total) -> total + amount, // 聚合函数
Materialized.<String, Double>as("user-spend-store") // 状态存储名称
.withValueSerde(Serdes.Double())
);
// 输出到 Topic
userTotalSpend.toStream().to("user-spend-summary");
// 查询本地状态存储(Interactive Queries)
ReadOnlyKeyValueStore<String, Double> store =
streams.store(StoreQueryParameters.fromNameAndType(
"user-spend-store", QueryableStoreTypes.keyValueStore()));
Double total = store.get("user-123"); // 实时查询某用户总消费
窗口操作
// ── Tumbling Window(滚动窗口):不重叠的固定时间窗口 ──
KTable<Windowed<String>, Long> orderCount1Min = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count();
// ── Hopping Window(跳跃窗口):重叠的固定时间窗口 ──
// 每30秒统计一次过去5分钟的数据
KTable<Windowed<String>, Double> rollingRevenue = userOrders
.groupByKey(Grouped.with(Serdes.String(), Serdes.Double()))
.windowedBy(TimeWindows
.ofSizeWithNoGrace(Duration.ofMinutes(5))
.advanceBy(Duration.ofSeconds(30)))
.reduce(Double::sum);
// ── Session Window(会话窗口):活跃间隔超过阈值则结束会话 ──
KTable<Windowed<String>, Long> sessions = orders
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
// 用户 30 分钟无操作则结束当前 session,统计 session 内操作次数
三种窗口类型对比(时间轴)
Tumbling Window(每1分钟):
|─────W1─────|─────W2─────|─────W3─────|
0 60s 120s 180s
Hopping Window(5分钟宽度,30秒步长):
|───────────────W1───────────────|
|───────────────W2───────────────|
|───────────────W3───────────────|
Session Window(30分钟无活动则结束):
[e1 e2 e3]──gap≥30m──[e4 e5]──gap≥30m──[e6]
───S1──── ──S2── S3
实战:实时统计每分钟订单金额
// 完整示例:每分钟聚合一次各品类的订单总额,输出到 order-revenue-1min
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "revenue-aggregator");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// RocksDB 状态存储目录
streamsProps.put(StreamsConfig.STATE_DIR_CONFIG, "/data/kafka-streams");
StreamsBuilder builder = new StreamsBuilder();
builder
.stream<String, String>("orders")
// 提取 category 作为新 Key,amount 作为 Value
.map((k, v) -> {
Order o = Order.fromJson(v);
return KeyValue.pair(o.getCategory(), o.getAmount().toString());
})
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(
() -> "0.0",
(cat, amtStr, total) -> String.valueOf(Double.parseDouble(total) + Double.parseDouble(amtStr))
)
.toStream()
// 窗口 Key 转换为可读字符串:category@windowStart-windowEnd
.map((wk, v) -> KeyValue.pair(
wk.key() + "@" + wk.window().startTime() + "-" + wk.window().endTime(),
v
))
.to("order-revenue-1min");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
streams.start();
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() ->
streams.close(Duration.ofSeconds(5))
));
状态容错恢复:Kafka Streams 的 RocksDB 状态存储会定期将增量变更备份到 Kafka 内部的 changelog Topic(application-id-store-name-changelog)。节点故障重启后,Streams 会自动从 changelog 恢复状态,通常在秒级内完成恢复,无需外部数据库支持。