Chapter 04

Producer 开发与调优

从基础发送到 Exactly-Once 语义——掌握高吞吐可靠消息生产的全部技巧

KafkaProducer 基础配置(Java)

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

Properties props = new Properties();

// ── 必填配置 ──
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
props.put("key.serializer",   StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

// ── 可靠性配置 ──
props.put("acks", "all");          // 等待所有 ISR 副本确认
props.put("retries", Integer.MAX_VALUE); // 开启幂等后自动重试
props.put("enable.idempotence", true); // 幂等 Producer

// ── 吞吐量调优 ──
props.put("batch.size", 65536);     // 64KB 批次(默认 16KB)
props.put("linger.ms", 5);           // 等待最多 5ms 凑批(默认 0)
props.put("compression.type", "lz4"); // LZ4 压缩
props.put("buffer.memory", 67108864L); // 发送缓冲区 64MB

// ── 超时与重试 ──
props.put("request.timeout.ms", 30000);   // Broker 响应超时
props.put("delivery.timeout.ms", 120000); // 整体投递超时(含重试)
props.put("max.in.flight.requests.per.connection", 5); // 幂等时最大5

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

同步发送 vs 异步发送

// ── 同步发送(等待 Broker 确认)──
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", "order-123", "{\"total\":99.9}");

try {
    RecordMetadata metadata = producer.send(record).get(); // 阻塞等待
    System.out.printf("发送成功: 分区=%d, Offset=%d%n",
        metadata.partition(), metadata.offset());
} catch (Exception e) {
    System.err.println("发送失败: " + e.getMessage());
}

// ── 异步发送(Callback 回调)── 推荐,不阻塞业务线程
producer.send(record, (RecordMetadata meta, Exception ex) -> {
    if (ex != null) {
        log.error("消息发送失败: topic={}, key={}",
            record.topic(), record.key(), ex);
        // 可以写入本地日志或告警
    } else {
        log.debug("发送成功: partition={}, offset={}",
            meta.partition(), meta.offset());
    }
});

// 程序退出前必须 flush + close
producer.flush(); // 等待所有缓冲消息发出
producer.close(); // 释放连接资源

批量发送参数调优

Producer 内部批处理流程 send(record) ┌─────────────────────────────────┐ │ RecordAccumulator │ ← 内存缓冲区 │ Partition 0: [r1][r2][r3] │ ← batch.size=64KB │ Partition 1: [r4] │ │ Partition 2: [r5][r6][r7][r8] │ ← batch 满,立即发送 └────────────────┬────────────────┘ linger.ms=5ms 到期 │ 或 batch 已满 Sender 线程(后台) │ 压缩 → 网络发送 Broker Leader
参数默认值调优建议
batch.size16384 (16KB)高吞吐场景调至 64KB~256KB
linger.ms0增大至 5~20ms 可显著提升批次填充率
buffer.memory33554432 (32MB)确保不小于 batch.size × 分区数
compression.typenone推荐 lz4(高速压缩)或 zstd(高压缩比)
max.request.size1048576 (1MB)大消息场景需同步调整 Broker 的 message.max.bytes

acks 可靠性配置

acks 值语义丢消息风险适用场景
0不等待 Broker 确认,直接返回极高(网络断开即丢)日志、指标(可丢失)
1等待 Leader 写入磁盘确认中(Leader 宕机可能丢)一般业务消息
all(-1)等待所有 ISR 副本确认极低金融、订单等关键消息

acks=all 不等于不丢消息:若 min.insync.replicas=1(默认),则即使只有 Leader 一个副本,acks=all 也会成功。生产环境需同时设置 min.insync.replicas=2(3节点集群),确保至少 2 个副本写入成功才确认。

幂等 Producer(Idempotent Producer)

开启 enable.idempotence=true 后,Producer 获得一个 Producer ID(PID),每条消息附带单调递增的序列号。Broker 通过 PID + 分区 + 序列号去重,保证重试导致的重复消息不会被写入两次(单分区 Exactly-Once)。

// 幂等 Producer 配置(Java)
props.put("enable.idempotence", true);
// 以下配置会被自动设置为最佳值:
// acks=all, retries=Integer.MAX_VALUE, max.in.flight.requests.per.connection=5

消息压缩:各算法对比

压缩算法压缩比压缩速度解压速度推荐场景
GZIP最高存储成本敏感,CPU 充足
Snappy均衡场景(Google 内部用)
LZ4极快极快高吞吐低延迟(推荐)
ZSTD高(接近GZIP)高压缩比+高速(Kafka 2.1+)

压缩在 Producer 端还是 Broker 端?:压缩在 Producer 端完成,Broker 直接存储压缩后的 RecordBatch(无需解压再压缩)。若 Broker 配置的 compression.type 与 Producer 不一致,Broker 才会解压后重新压缩,这会显著增加 CPU 开销。建议保持 Producer 和 Broker 压缩类型一致,或将 Broker 设置为 producer(保持原样)。

事务 Producer:跨分区 Exactly-Once

幂等 Producer 只保证单分区内的 Exactly-Once。若需要跨多个 Topic/Partition 原子写入,需要使用事务 Producer

// 事务 Producer 配置
props.put("enable.idempotence", true);
props.put("transactional.id", "order-service-tx-1"); // 全局唯一事务ID

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 向 Broker 注册事务 ID

try {
    producer.beginTransaction();

    // 原子写入多个 Topic
    producer.send(new ProducerRecord<>("orders", "order-123", orderJson));
    producer.send(new ProducerRecord<>("inventory", "item-456", inventoryJson));
    producer.send(new ProducerRecord<>("notifications", "user-789", emailJson));

    producer.commitTransaction(); // 三条消息原子提交
} catch (ProducerFencedException e) {
    // 同一 transactional.id 有新实例启动,当前实例被隔离(Fencing)
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction(); // 回滚本次事务
    // 处理异常,可重试
}

Consumer 必须配置 isolation.level=read_committed:事务 Producer 写入的消息,Consumer 默认(read_uncommitted)可以读到事务未提交的消息。若业务需要 Exactly-Once,Consumer 端必须设置 isolation.level=read_committed,只读取已提交的事务消息,代价是会增加少量延迟(等待事务标记写入)。

Python Producer 实战(confluent-kafka)

from confluent_kafka import Producer
import json

# 创建 Producer
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'batch.size': 65536,
    'linger.ms': 5,
    'retries': 10,
})

def delivery_callback(err, msg):
    if err:
        print(f'消息发送失败: {err}')
    else:
        print(f'发送成功: topic={msg.topic()}, partition={msg.partition()}, offset={msg.offset()}')

# 异步发送
orders = [
    {"id": "order-001", "amount": 199.5, "user_id": "u-100"},
    {"id": "order-002", "amount": 88.0,  "user_id": "u-200"},
]

for order in orders:
    producer.produce(
        topic='orders',
        key=order['id'],                     # 按订单 ID 路由到固定分区
        value=json.dumps(order, ensure_ascii=False),
        callback=delivery_callback,
        headers={'source': 'order-service'}  # 自定义 Header
    )
    producer.poll(0)  # 触发 Callback,非阻塞

# 等待所有消息发送完毕
producer.flush(timeout=10)