KafkaConsumer 完整配置
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
Properties props = new Properties();
// ── 必填配置 ──
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processor-v2"); // 消费组 ID
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// ── Offset 策略 ──
props.put("auto.offset.reset", "earliest"); // 无提交记录时从最早开始(earliest/latest/none)
props.put("enable.auto.commit", "false"); // 关闭自动提交,手动控制
// ── 拉取调优 ──
props.put("max.poll.records", 500); // 每次 poll 最多返回 500 条
props.put("fetch.min.bytes", 1024); // 最少等 1KB 数据再返回
props.put("fetch.max.wait.ms", 500); // 最多等 500ms
// ── 心跳与超时 ──
props.put("session.timeout.ms", 45000); // Consumer 心跳超时(默认 45s)
props.put("heartbeat.interval.ms", 3000); // 心跳间隔(应 < session.timeout/3)
props.put("max.poll.interval.ms", 300000); // 两次 poll 间最大间隔(5分钟)
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
Poll 循环与手动 Offset 提交
try {
while (true) {
// poll 超时时间:等待最多 100ms,若有数据立即返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
// 按分区处理(保证分区内处理顺序)
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords =
records.records(partition);
for (ConsumerRecord<String, String> record : partRecords) {
// 业务处理
processOrder(record.value());
}
// 按分区手动提交 Offset(批量提交更高效)
long lastOffset = partRecords.get(partRecords.size() - 1).offset();
consumer.commitSync(Map.of(
partition, new OffsetAndMetadata(lastOffset + 1)
// 注意:提交的是"下次要读取的 Offset",即 lastOffset + 1
));
}
}
} catch (WakeupException e) {
// 正常关闭信号(由 consumer.wakeup() 触发)
} finally {
consumer.close(); // 关闭前发送最终 Offset 提交并通知 Group Coordinator
}
// 优雅关闭(在 ShutdownHook 中调用)
Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));
commitSync vs commitAsync
| 方式 | 行为 | 适用场景 |
|---|---|---|
commitSync() | 同步等待 Broker 确认,失败自动重试 | 关键业务,必须确保 Offset 提交成功 |
commitAsync(callback) | 异步提交,不阻塞,失败不重试(可能乱序) | 高吞吐场景,与最终 commitSync() 配合 |
分区分配策略
三种分区分配策略对比(4分区,3个Consumer)
Range 策略(按分区范围连续分配):
Consumer-0: P0, P1
Consumer-1: P2, P3
Consumer-2: (空闲)
RoundRobin 策略(轮询分配):
Consumer-0: P0, P3
Consumer-1: P1
Consumer-2: P2
Sticky 策略(首次轮询,Rebalance 后保持原有分配):
Consumer-0: P0, P3 ← Rebalance 后尽量不变
Consumer-1: P1 ← 减少状态迁移开销
Consumer-2: P2
// 设置分区分配策略
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.StickyAssignor");
// 多策略配置(按优先级尝试)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// CooperativeStickyAssignor(Kafka 2.4+):增量式 Rebalance,
// 未受影响的分区不停止消费,大幅减少 Rebalance 影响
Rebalance:触发时机与优化
Rebalance 是 Consumer Group 中分区重新分配的过程,期间所有 Consumer 停止消费,直到分配完成。触发时机:
- 新的 Consumer 加入 Group
- Consumer 实例退出(正常关闭或心跳超时)
- Topic 分区数发生变化
- Consumer 调用
subscribe()修改订阅的 Topic
减少 Rebalance 的实践
// 1. 增大心跳超时,防止短暂 GC 触发误判
props.put("session.timeout.ms", 60000); // 60秒(默认45秒)
props.put("heartbeat.interval.ms", 5000); // 5秒心跳间隔
// 2. 控制 poll 处理时间,防止超过 max.poll.interval.ms
props.put("max.poll.records", 200); // 减少单次处理量
props.put("max.poll.interval.ms", 600000); // 增大处理超时(重度处理场景)
// 3. Rebalance Listener:处理分区撤销时保存状态
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 分区被撤销前:提交当前 Offset,保存本地状态
consumer.commitSync(currentOffsets);
saveLocalState(partitions);
log.info("分区被撤销,已提交 Offset: {}", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 新分区分配后:加载本地状态,准备消费
loadLocalState(partitions);
log.info("新分区分配: {}", partitions);
}
});
Consumer Lag 监控
# 命令行查看 Consumer Group 的 Lag
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe --group order-processor-v2
# 输出示例:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processor-v2 orders 0 1050 1055 5
# order-processor-v2 orders 1 2000 2000 0
# order-processor-v2 orders 2 980 990 10
# → 分区0 Lag=5, 分区2 Lag=10,消费稍有落后
Python Consumer 实战(confluent-kafka)
from confluent_kafka import Consumer, KafkaError, KafkaException
import json, signal, sys
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor-py',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # 手动提交
'max.poll.interval.ms': 300000,
'session.timeout.ms': 45000,
})
running = True
def shutdown(sig, frame):
global running
running = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
consumer.subscribe(['orders'])
try:
while running:
# 拉取消息,超时 1 秒
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f'已到分区末尾: {msg.topic()} [{msg.partition()}]')
else:
raise KafkaException(msg.error())
continue
# 正常消息处理
order = json.loads(msg.value().decode('utf-8'))
print(f'处理订单: {order["id"]}, 金额: {order["amount"]}')
# 手动提交 Offset
consumer.commit(msg, asynchronous=False)
finally:
consumer.close() # 提交剩余 Offset,通知 Group Coordinator
print('Consumer 已关闭')
多线程消费模式
KafkaConsumer 不是线程安全的:不能在多个线程中共享同一个 Consumer 实例。多线程消费有两种主流模式:
模式一:每线程一个 Consumer(推荐)
// 每个线程拥有独立的 KafkaConsumer,最简单可靠
ExecutorService executor = Executors.newFixedThreadPool(partitionCount);
for (int i = 0; i < partitionCount; i++) {
executor.submit(new ConsumerWorker("orders", "group-id", kafkaProps));
}
模式二:单 Consumer + 线程池处理
// 单 Consumer 负责 poll,业务处理交给线程池(解耦 I/O 和 CPU 密集处理)
BlockingQueue<ConsumerRecord<String, String>> queue =
new LinkedBlockingQueue<>(10000);
// Consumer 线程:只负责 poll 和入队
while (running) {
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(100))) {
queue.put(record); // 阻塞直到队列有空位
}
// 注意:Offset 提交需要在处理完成后谨慎管理,避免乱序提交
}