Topic 创建策略
创建 Topic 时需要权衡两个核心参数:分区数(partitions)和副本因子(replication-factor)。
# 创建生产级 Topic:6分区,副本因子3
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic ecommerce.order.created \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7天保留
--config min.insync.replicas=2 # 最少2个副本同步才允许写入
# 查看详情
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic ecommerce.order.created
# 输出示例:
# Topic: ecommerce.order.created PartitionCount: 6 ReplicationFactor: 3
# Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
# Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Partition 分配机制
1. 默认轮询分配(无 Key)
当 Producer 发送消息未指定 Key 时,Kafka 使用粘性分区器(Sticky Partitioner,2.4+ 默认):在一个 batch 内将消息发往同一分区,batch 满或 linger 时间到后才切换分区。这比旧版轮询(每条消息切换分区)更高效。
2. 按 Key Hash 分配
指定 Key 的消息使用 murmur2(key) % numPartitions 确定目标分区。相同 Key 的消息总是发往同一分区,保证分区内有序。
热点分区问题:若 Key 分布不均匀(如以用户 ID 为 Key 但某大 V 流量极高),会导致某个分区成为热点,消费 Lag 积压。解决方案:在 Key 中加入随机后缀(userId + "_" + random(0,N)),代价是无法保证同一用户的消息顺序。
3. 自定义分区器(Java)
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class PriorityPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
String msgKey = (String) key;
// VIP 订单发往分区 0(单独消费者处理,保证低延迟)
if (msgKey != null && msgKey.startsWith("VIP_")) {
return 0;
}
// 普通消息均匀分布到分区 1 ~ numPartitions-1
return (msgKey.hashCode() & Integer.MAX_VALUE) % (numPartitions - 1) + 1;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
副本机制与 ISR
High Water Mark(HWM)是所有 ISR 副本都已确认写入的最大 Offset。Consumer 只能读取 HWM 以下的消息,确保已消费的消息不会因 Leader 切换而丢失。
ISR 收缩与扩展
- 收缩(移出 ISR):Follower 超过
replica.lag.time.max.ms(默认 30s)未发送 Fetch 请求,或落后 Leader 过多 - 扩展(加回 ISR):Follower 追上 Leader 的 LEO 后自动重新加入 ISR
- Leader 选举:Leader 宕机时,从 ISR 中选出 LEO 最大的副本作为新 Leader
数据保留策略
Kafka 通过 Topic 级别的配置控制数据保留,支持三种策略:
| 策略 | 配置参数 | 说明 |
|---|---|---|
| 按时间保留 | retention.ms | 默认 604800000(7天),-1 表示永久 |
| 按大小保留 | retention.bytes | 单个分区日志最大字节数,默认 -1(无限制) |
| 日志压缩 | cleanup.policy=compact | 按 Key 保留最新值,旧版本消息被压缩删除 |
# 修改 Topic 的保留策略(动态修改,无需重启)
# 改为按大小:每个分区最多 1GB
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders \
--alter --add-config retention.bytes=1073741824
# 启用日志压缩(适合 Key-Value 状态存储类 Topic)
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name user-profiles \
--alter --add-config cleanup.policy=compact
# 查看 Topic 配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders --describe
日志压缩(Log Compaction)详解
cleanup.policy=compact 模式下,Kafka 保证每个 Key 至少保留最新的一条消息。适合用于实现类似 KV 存储的场景——比如用 Kafka 存储用户最新配置,即使消息积累了百万条,压缩后只保留每个用户的最新值。
消息格式:RecordBatch
Kafka 0.11+ 使用 RecordBatch 格式,消息以批次存储,支持 Header、压缩、事务和幂等性。
| 字段 | 说明 |
|---|---|
| baseOffset | 批次第一条消息的 Offset |
| partitionLeaderEpoch | Leader 选举轮次,用于检测过期 Offset |
| magic | 消息格式版本(当前为 2) |
| compression | 压缩类型(NONE/GZIP/SNAPPY/LZ4/ZSTD) |
| timestampType | CREATE_TIME(Producer 写入时间)或 LOG_APPEND_TIME(Broker 追加时间) |
| producerId / producerEpoch | 幂等 Producer 和事务 ID |
| headers | 键值对元数据,用于路由、追踪(如 trace-id) |
日志段(Log Segment)
每个 Partition 的数据存储在一系列日志段(Segment)文件中,每个 Segment 包含:
.log— 实际消息数据,达到log.segment.bytes(默认 1GB)时滚动新建.index— 稀疏索引(Offset → 文件物理位置),间隔约 4KB 记录一条.timeindex— 时间戳索引(时间戳 → Offset),支持按时间范围查找
# 查看 Partition 0 的日志文件
ls -la /tmp/kafka-logs/orders-0/
# 00000000000000000000.log ← 从 Offset 0 开始的段
# 00000000000000000000.index
# 00000000000000000000.timeindex
# 00000000001073741824.log ← 从 Offset 约 100万 开始的段
# ...
# 解析日志文件内容(调试用)
bin/kafka-dump-log.sh \
--files /tmp/kafka-logs/orders-0/00000000000000000000.log \
--print-data-log
分区数量规划
分区数的选择直接影响吞吐量和运维复杂度。常用估算公式:
# 分区数 = max(生产者吞吐 / 单分区写入速度, 消费者吞吐 / 单分区读取速度)
# 示例:
# 目标吞吐:500 MB/s
# 单 Broker 写入速度:~100 MB/s
# → 至少需要 5 个分区(分散到不同 Broker)
# 消费端:20个 Consumer 实例,每个处理 30 MB/s
# → 至少需要 20 个分区(每个 Consumer 一个分区)
# 实际建议:
# - 初始分区数 = 预期高峰 QPS / 单分区 QPS 上限 * 2(留余量)
# - 避免超过 Broker 数量 * 2000(每个分区都会占用文件句柄)
# - 消费者数量不能超过分区数(超出部分空闲)
# - 分区数是并行度上限,可以增加但不能减少
经验法则:如果不确定,从 6 个分区开始(能均匀分配到 2/3/6 个 Consumer),后期根据 Lag 监控按需增加。过多的分区(>万级别)会增加 Controller 的元数据压力和 Rebalance 时间。