Chapter 05

Consumer 开发与消费组

精准控制消费进度——从 Poll 循环到 Rebalance 优化的完整消费者开发指南

KafkaConsumer 完整配置

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

Properties props = new Properties();

// ── 必填配置 ──
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processor-v2");     // 消费组 ID
props.put("key.deserializer",   StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

// ── Offset 策略 ──
props.put("auto.offset.reset", "earliest"); // 无提交记录时从最早开始(earliest/latest/none)
props.put("enable.auto.commit", "false");    // 关闭自动提交,手动控制

// ── 拉取调优 ──
props.put("max.poll.records", 500);          // 每次 poll 最多返回 500 条
props.put("fetch.min.bytes", 1024);          // 最少等 1KB 数据再返回
props.put("fetch.max.wait.ms", 500);         // 最多等 500ms

// ── 心跳与超时 ──
props.put("session.timeout.ms", 45000);      // Consumer 心跳超时(默认 45s)
props.put("heartbeat.interval.ms", 3000);    // 心跳间隔(应 < session.timeout/3)
props.put("max.poll.interval.ms", 300000);  // 两次 poll 间最大间隔(5分钟)

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

Poll 循环与手动 Offset 提交

try {
    while (true) {
        // poll 超时时间:等待最多 100ms,若有数据立即返回
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        if (records.isEmpty()) continue;

        // 按分区处理(保证分区内处理顺序)
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partRecords =
                records.records(partition);

            for (ConsumerRecord<String, String> record : partRecords) {
                // 业务处理
                processOrder(record.value());
            }

            // 按分区手动提交 Offset(批量提交更高效)
            long lastOffset = partRecords.get(partRecords.size() - 1).offset();
            consumer.commitSync(Map.of(
                partition, new OffsetAndMetadata(lastOffset + 1)
                // 注意:提交的是"下次要读取的 Offset",即 lastOffset + 1
            ));
        }
    }
} catch (WakeupException e) {
    // 正常关闭信号(由 consumer.wakeup() 触发)
} finally {
    consumer.close(); // 关闭前发送最终 Offset 提交并通知 Group Coordinator
}

// 优雅关闭(在 ShutdownHook 中调用)
Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));

commitSync vs commitAsync

方式行为适用场景
commitSync()同步等待 Broker 确认,失败自动重试关键业务,必须确保 Offset 提交成功
commitAsync(callback)异步提交,不阻塞,失败不重试(可能乱序)高吞吐场景,与最终 commitSync() 配合

分区分配策略

三种分区分配策略对比(4分区,3个Consumer) Range 策略(按分区范围连续分配): Consumer-0: P0, P1 Consumer-1: P2, P3 Consumer-2: (空闲) RoundRobin 策略(轮询分配): Consumer-0: P0, P3 Consumer-1: P1 Consumer-2: P2 Sticky 策略(首次轮询,Rebalance 后保持原有分配): Consumer-0: P0, P3 ← Rebalance 后尽量不变 Consumer-1: P1 ← 减少状态迁移开销 Consumer-2: P2
// 设置分区分配策略
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.StickyAssignor");
// 多策略配置(按优先级尝试)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
    // CooperativeStickyAssignor(Kafka 2.4+):增量式 Rebalance,
    // 未受影响的分区不停止消费,大幅减少 Rebalance 影响

Rebalance:触发时机与优化

Rebalance 是 Consumer Group 中分区重新分配的过程,期间所有 Consumer 停止消费,直到分配完成。触发时机:

减少 Rebalance 的实践

// 1. 增大心跳超时,防止短暂 GC 触发误判
props.put("session.timeout.ms", 60000);   // 60秒(默认45秒)
props.put("heartbeat.interval.ms", 5000); // 5秒心跳间隔

// 2. 控制 poll 处理时间,防止超过 max.poll.interval.ms
props.put("max.poll.records", 200);         // 减少单次处理量
props.put("max.poll.interval.ms", 600000); // 增大处理超时(重度处理场景)

// 3. Rebalance Listener:处理分区撤销时保存状态
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 分区被撤销前:提交当前 Offset,保存本地状态
        consumer.commitSync(currentOffsets);
        saveLocalState(partitions);
        log.info("分区被撤销,已提交 Offset: {}", partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 新分区分配后:加载本地状态,准备消费
        loadLocalState(partitions);
        log.info("新分区分配: {}", partitions);
    }
});

Consumer Lag 监控

# 命令行查看 Consumer Group 的 Lag
bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe --group order-processor-v2

# 输出示例:
# GROUP              TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-processor-v2 orders  0          1050            1055            5
# order-processor-v2 orders  1          2000            2000            0
# order-processor-v2 orders  2          980             990             10
# → 分区0 Lag=5, 分区2 Lag=10,消费稍有落后

Python Consumer 实战(confluent-kafka)

from confluent_kafka import Consumer, KafkaError, KafkaException
import json, signal, sys

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor-py',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,      # 手动提交
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 45000,
})

running = True

def shutdown(sig, frame):
    global running
    running = False

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

consumer.subscribe(['orders'])

try:
    while running:
        # 拉取消息,超时 1 秒
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f'已到分区末尾: {msg.topic()} [{msg.partition()}]')
            else:
                raise KafkaException(msg.error())
            continue

        # 正常消息处理
        order = json.loads(msg.value().decode('utf-8'))
        print(f'处理订单: {order["id"]}, 金额: {order["amount"]}')

        # 手动提交 Offset
        consumer.commit(msg, asynchronous=False)

finally:
    consumer.close()  # 提交剩余 Offset,通知 Group Coordinator
    print('Consumer 已关闭')

多线程消费模式

KafkaConsumer 不是线程安全的:不能在多个线程中共享同一个 Consumer 实例。多线程消费有两种主流模式:

模式一:每线程一个 Consumer(推荐)

// 每个线程拥有独立的 KafkaConsumer,最简单可靠
ExecutorService executor = Executors.newFixedThreadPool(partitionCount);
for (int i = 0; i < partitionCount; i++) {
    executor.submit(new ConsumerWorker("orders", "group-id", kafkaProps));
}

模式二:单 Consumer + 线程池处理

// 单 Consumer 负责 poll,业务处理交给线程池(解耦 I/O 和 CPU 密集处理)
BlockingQueue<ConsumerRecord<String, String>> queue =
    new LinkedBlockingQueue<>(10000);

// Consumer 线程:只负责 poll 和入队
while (running) {
    for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(100))) {
        queue.put(record); // 阻塞直到队列有空位
    }
    // 注意:Offset 提交需要在处理完成后谨慎管理,避免乱序提交
}