KafkaProducer 基础配置(Java)
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
Properties props = new Properties();
// ── 必填配置 ──
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// ── 可靠性配置 ──
props.put("acks", "all"); // 等待所有 ISR 副本确认
props.put("retries", Integer.MAX_VALUE); // 开启幂等后自动重试
props.put("enable.idempotence", true); // 幂等 Producer
// ── 吞吐量调优 ──
props.put("batch.size", 65536); // 64KB 批次(默认 16KB)
props.put("linger.ms", 5); // 等待最多 5ms 凑批(默认 0)
props.put("compression.type", "lz4"); // LZ4 压缩
props.put("buffer.memory", 67108864L); // 发送缓冲区 64MB
// ── 超时与重试 ──
props.put("request.timeout.ms", 30000); // Broker 响应超时
props.put("delivery.timeout.ms", 120000); // 整体投递超时(含重试)
props.put("max.in.flight.requests.per.connection", 5); // 幂等时最大5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
同步发送 vs 异步发送
// ── 同步发送(等待 Broker 确认)──
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "order-123", "{\"total\":99.9}");
try {
RecordMetadata metadata = producer.send(record).get(); // 阻塞等待
System.out.printf("发送成功: 分区=%d, Offset=%d%n",
metadata.partition(), metadata.offset());
} catch (Exception e) {
System.err.println("发送失败: " + e.getMessage());
}
// ── 异步发送(Callback 回调)── 推荐,不阻塞业务线程
producer.send(record, (RecordMetadata meta, Exception ex) -> {
if (ex != null) {
log.error("消息发送失败: topic={}, key={}",
record.topic(), record.key(), ex);
// 可以写入本地日志或告警
} else {
log.debug("发送成功: partition={}, offset={}",
meta.partition(), meta.offset());
}
});
// 程序退出前必须 flush + close
producer.flush(); // 等待所有缓冲消息发出
producer.close(); // 释放连接资源
批量发送参数调优
| 参数 | 默认值 | 调优建议 |
|---|---|---|
batch.size | 16384 (16KB) | 高吞吐场景调至 64KB~256KB |
linger.ms | 0 | 增大至 5~20ms 可显著提升批次填充率 |
buffer.memory | 33554432 (32MB) | 确保不小于 batch.size × 分区数 |
compression.type | none | 推荐 lz4(高速压缩)或 zstd(高压缩比) |
max.request.size | 1048576 (1MB) | 大消息场景需同步调整 Broker 的 message.max.bytes |
acks 可靠性配置
| acks 值 | 语义 | 丢消息风险 | 适用场景 |
|---|---|---|---|
0 | 不等待 Broker 确认,直接返回 | 极高(网络断开即丢) | 日志、指标(可丢失) |
1 | 等待 Leader 写入磁盘确认 | 中(Leader 宕机可能丢) | 一般业务消息 |
all(-1) | 等待所有 ISR 副本确认 | 极低 | 金融、订单等关键消息 |
acks=all 不等于不丢消息:若 min.insync.replicas=1(默认),则即使只有 Leader 一个副本,acks=all 也会成功。生产环境需同时设置 min.insync.replicas=2(3节点集群),确保至少 2 个副本写入成功才确认。
幂等 Producer(Idempotent Producer)
开启 enable.idempotence=true 后,Producer 获得一个 Producer ID(PID),每条消息附带单调递增的序列号。Broker 通过 PID + 分区 + 序列号去重,保证重试导致的重复消息不会被写入两次(单分区 Exactly-Once)。
// 幂等 Producer 配置(Java)
props.put("enable.idempotence", true);
// 以下配置会被自动设置为最佳值:
// acks=all, retries=Integer.MAX_VALUE, max.in.flight.requests.per.connection=5
消息压缩:各算法对比
| 压缩算法 | 压缩比 | 压缩速度 | 解压速度 | 推荐场景 |
|---|---|---|---|---|
| GZIP | 最高 | 慢 | 中 | 存储成本敏感,CPU 充足 |
| Snappy | 中 | 快 | 快 | 均衡场景(Google 内部用) |
| LZ4 | 中 | 极快 | 极快 | 高吞吐低延迟(推荐) |
| ZSTD | 高(接近GZIP) | 快 | 快 | 高压缩比+高速(Kafka 2.1+) |
压缩在 Producer 端还是 Broker 端?:压缩在 Producer 端完成,Broker 直接存储压缩后的 RecordBatch(无需解压再压缩)。若 Broker 配置的 compression.type 与 Producer 不一致,Broker 才会解压后重新压缩,这会显著增加 CPU 开销。建议保持 Producer 和 Broker 压缩类型一致,或将 Broker 设置为 producer(保持原样)。
事务 Producer:跨分区 Exactly-Once
幂等 Producer 只保证单分区内的 Exactly-Once。若需要跨多个 Topic/Partition 原子写入,需要使用事务 Producer。
// 事务 Producer 配置
props.put("enable.idempotence", true);
props.put("transactional.id", "order-service-tx-1"); // 全局唯一事务ID
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 向 Broker 注册事务 ID
try {
producer.beginTransaction();
// 原子写入多个 Topic
producer.send(new ProducerRecord<>("orders", "order-123", orderJson));
producer.send(new ProducerRecord<>("inventory", "item-456", inventoryJson));
producer.send(new ProducerRecord<>("notifications", "user-789", emailJson));
producer.commitTransaction(); // 三条消息原子提交
} catch (ProducerFencedException e) {
// 同一 transactional.id 有新实例启动,当前实例被隔离(Fencing)
producer.close();
} catch (KafkaException e) {
producer.abortTransaction(); // 回滚本次事务
// 处理异常,可重试
}
Consumer 必须配置 isolation.level=read_committed:事务 Producer 写入的消息,Consumer 默认(read_uncommitted)可以读到事务未提交的消息。若业务需要 Exactly-Once,Consumer 端必须设置 isolation.level=read_committed,只读取已提交的事务消息,代价是会增加少量延迟(等待事务标记写入)。
Python Producer 实战(confluent-kafka)
from confluent_kafka import Producer
import json
# 创建 Producer
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'batch.size': 65536,
'linger.ms': 5,
'retries': 10,
})
def delivery_callback(err, msg):
if err:
print(f'消息发送失败: {err}')
else:
print(f'发送成功: topic={msg.topic()}, partition={msg.partition()}, offset={msg.offset()}')
# 异步发送
orders = [
{"id": "order-001", "amount": 199.5, "user_id": "u-100"},
{"id": "order-002", "amount": 88.0, "user_id": "u-200"},
]
for order in orders:
producer.produce(
topic='orders',
key=order['id'], # 按订单 ID 路由到固定分区
value=json.dumps(order, ensure_ascii=False),
callback=delivery_callback,
headers={'source': 'order-service'} # 自定义 Header
)
producer.poll(0) # 触发 Callback,非阻塞
# 等待所有消息发送完毕
producer.flush(timeout=10)