Chapter 03

Topic 与 Partition 深度

分区规划、副本机制、ISR 与数据保留——Kafka 持久化层的完整图景

Topic 创建策略

创建 Topic 时需要权衡两个核心参数:分区数(partitions)副本因子(replication-factor)

# 创建生产级 Topic:6分区,副本因子3
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic ecommerce.order.created \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=604800000 \     # 7天保留
  --config min.insync.replicas=2          # 最少2个副本同步才允许写入

# 查看详情
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic ecommerce.order.created
# 输出示例:
# Topic: ecommerce.order.created  PartitionCount: 6  ReplicationFactor: 3
# Partition: 0  Leader: 2  Replicas: 2,3,1  Isr: 2,3,1
# Partition: 1  Leader: 3  Replicas: 3,1,2  Isr: 3,1,2

Partition 分配机制

1. 默认轮询分配(无 Key)

当 Producer 发送消息未指定 Key 时,Kafka 使用粘性分区器(Sticky Partitioner,2.4+ 默认):在一个 batch 内将消息发往同一分区,batch 满或 linger 时间到后才切换分区。这比旧版轮询(每条消息切换分区)更高效。

2. 按 Key Hash 分配

指定 Key 的消息使用 murmur2(key) % numPartitions 确定目标分区。相同 Key 的消息总是发往同一分区,保证分区内有序。

热点分区问题:若 Key 分布不均匀(如以用户 ID 为 Key 但某大 V 流量极高),会导致某个分区成为热点,消费 Lag 积压。解决方案:在 Key 中加入随机后缀(userId + "_" + random(0,N)),代价是无法保证同一用户的消息顺序。

3. 自定义分区器(Java)

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class PriorityPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                          Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        String msgKey = (String) key;

        // VIP 订单发往分区 0(单独消费者处理,保证低延迟)
        if (msgKey != null && msgKey.startsWith("VIP_")) {
            return 0;
        }
        // 普通消息均匀分布到分区 1 ~ numPartitions-1
        return (msgKey.hashCode() & Integer.MAX_VALUE) % (numPartitions - 1) + 1;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

副本机制与 ISR

副本同步机制 Topic: orders, Partition: 0 Producer ──write──▶ ┌─────────────────────────┐ │ Broker 1 (Leader) │ │ [msg0][msg1][msg2][msg3] │ ← Log End Offset (LEO) = 4 └────────────┬────────────┘ Fetch │ Fetch ┌─────────────┴─────────────┐ ▼ ▼ ┌───────────────────┐ ┌───────────────────┐ │ Broker 2 (ISR) │ │ Broker 3 (ISR) │ │ [msg0][msg1][msg2]│ │ [msg0][msg1][msg2]│ │ LEO=3 (syncing) │ │ LEO=3 (syncing) │ └───────────────────┘ └───────────────────┘ High Water Mark (HWM) = 3 ← Consumer 只能读到 HWM

High Water Mark(HWM)是所有 ISR 副本都已确认写入的最大 Offset。Consumer 只能读取 HWM 以下的消息,确保已消费的消息不会因 Leader 切换而丢失。

ISR 收缩与扩展

数据保留策略

Kafka 通过 Topic 级别的配置控制数据保留,支持三种策略:

策略配置参数说明
按时间保留retention.ms默认 604800000(7天),-1 表示永久
按大小保留retention.bytes单个分区日志最大字节数,默认 -1(无限制)
日志压缩cleanup.policy=compact按 Key 保留最新值,旧版本消息被压缩删除
# 修改 Topic 的保留策略(动态修改,无需重启)

# 改为按大小:每个分区最多 1GB
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.bytes=1073741824

# 启用日志压缩(适合 Key-Value 状态存储类 Topic)
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name user-profiles \
  --alter --add-config cleanup.policy=compact

# 查看 Topic 配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders --describe

日志压缩(Log Compaction)详解

cleanup.policy=compact 模式下,Kafka 保证每个 Key 至少保留最新的一条消息。适合用于实现类似 KV 存储的场景——比如用 Kafka 存储用户最新配置,即使消息积累了百万条,压缩后只保留每个用户的最新值。

日志压缩过程 压缩前(Head 区域不被压缩,Tail 区域定期压缩): [K1:v1][K2:v1][K1:v2][K3:v1][K2:v2][K1:v3] ↑旧 ↑最新 压缩后(每个 Key 只保留最新值): [K2:v2][K3:v1][K1:v3] Tombstone(墓碑消息):Value 为 null 的消息,表示删除该 Key [K2:null] ← 经过一段时间后 K2 的所有消息被彻底删除

消息格式:RecordBatch

Kafka 0.11+ 使用 RecordBatch 格式,消息以批次存储,支持 Header、压缩、事务和幂等性。

字段说明
baseOffset批次第一条消息的 Offset
partitionLeaderEpochLeader 选举轮次,用于检测过期 Offset
magic消息格式版本(当前为 2)
compression压缩类型(NONE/GZIP/SNAPPY/LZ4/ZSTD)
timestampTypeCREATE_TIME(Producer 写入时间)或 LOG_APPEND_TIME(Broker 追加时间)
producerId / producerEpoch幂等 Producer 和事务 ID
headers键值对元数据,用于路由、追踪(如 trace-id)

日志段(Log Segment)

每个 Partition 的数据存储在一系列日志段(Segment)文件中,每个 Segment 包含:

# 查看 Partition 0 的日志文件
ls -la /tmp/kafka-logs/orders-0/
# 00000000000000000000.log     ← 从 Offset 0 开始的段
# 00000000000000000000.index
# 00000000000000000000.timeindex
# 00000000001073741824.log     ← 从 Offset 约 100万 开始的段
# ...

# 解析日志文件内容(调试用)
bin/kafka-dump-log.sh \
  --files /tmp/kafka-logs/orders-0/00000000000000000000.log \
  --print-data-log

分区数量规划

分区数的选择直接影响吞吐量和运维复杂度。常用估算公式:

# 分区数 = max(生产者吞吐 / 单分区写入速度, 消费者吞吐 / 单分区读取速度)

# 示例:
# 目标吞吐:500 MB/s
# 单 Broker 写入速度:~100 MB/s
# → 至少需要 5 个分区(分散到不同 Broker)

# 消费端:20个 Consumer 实例,每个处理 30 MB/s
# → 至少需要 20 个分区(每个 Consumer 一个分区)

# 实际建议:
# - 初始分区数 = 预期高峰 QPS / 单分区 QPS 上限 * 2(留余量)
# - 避免超过 Broker 数量 * 2000(每个分区都会占用文件句柄)
# - 消费者数量不能超过分区数(超出部分空闲)
# - 分区数是并行度上限,可以增加但不能减少

经验法则:如果不确定,从 6 个分区开始(能均匀分配到 2/3/6 个 Consumer),后期根据 Lag 监控按需增加。过多的分区(>万级别)会增加 Controller 的元数据压力和 Rebalance 时间。