Chapter 09

监控、调优与安全

从 JMX 指标到 Grafana 仪表盘——打造生产级稳定、可观测、安全的 Kafka 集群

核心 JMX 监控指标

Kafka 通过 JMX(Java Management Extensions)暴露丰富的运行时指标。以下是生产环境必须关注的关键指标:

指标名称(JMX MBean)含义告警阈值建议
kafka.server:MessagesInPerSec每秒写入消息数监控趋势,突降需告警
kafka.server:BytesInPerSec每秒写入字节数监控趋势,接近网卡带宽告警
kafka.server:UnderReplicatedPartitions未完全同步的分区数大于 0 立即告警
kafka.server:ActiveControllerCount活跃 Controller 数(集群内应为1)不等于 1 立即告警
kafka.server:RequestQueueSize请求队列积压持续大于 0 告警
kafka.network:RequestMetrics.LocalTimeMs请求在 Leader 上的处理时间P99 大于 100ms 告警
kafka.log:LogFlushRateAndTimeMs日志 fsync 延迟P99 大于 1000ms 告警
kafka.consumer:consumer-fetch-manager-metrics.records-lag-maxConsumer 最大 Lag根据业务 SLA 设置

Prometheus JMX Exporter 配置

# 1. 下载 JMX Exporter Agent
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar

# 2. kafka-jmx.yml 配置(使用开源社区 Kafka 模板)
rules:
  - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
    name: kafka_server_$1_$2
    type: GAUGE
    labels:
      clientId: "$3"
      topic: "$4"
      partition: "$5"
  - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
    name: kafka_server_$1_$2
    type: GAUGE
    labels:
      clientId: "$3"
      broker: "$4:$5"

# 3. Kafka 启动脚本中添加 JVM Agent
export KAFKA_JMX_OPTS="-javaagent:/opt/jmx_exporter.jar=9090:/opt/kafka-jmx.yml"

# 验证 Metrics 端点
curl http://localhost:9090/metrics | grep kafka_server

Prometheus 采集配置

# prometheus.yml
scrape_configs:
  - job_name: 'kafka-brokers'
    static_configs:
      - targets:
          - 'broker1:9090'
          - 'broker2:9090'
          - 'broker3:9090'
    scrape_interval: 15s

  - job_name: 'kafka-consumer-lag'
    # 使用 kafka-lag-exporter 单独采集 Consumer Lag
    static_configs:
      - targets: ['lag-exporter:9999']

三层性能调优

OS 层调优

# /etc/sysctl.conf 调整(需要 root)

# 增大文件描述符上限(每个分区需要多个文件句柄)
fs.file-max = 1000000
# /etc/security/limits.conf
kafka soft nofile 1000000
kafka hard nofile 1000000

# 网络缓冲区(提升吞吐量)
net.core.rmem_max = 134217728    # 128MB
net.core.wmem_max = 134217728
net.ipv4.tcp_rmem = 4096 65536 134217728
net.ipv4.tcp_wmem = 4096 65536 134217728

# 减少 PageCache 脏数据回写延迟
vm.dirty_ratio = 60
vm.dirty_background_ratio = 5

# 禁用 swap(避免 JVM GC 触发 swap 导致延迟抖动)
vm.swappiness = 1

# 磁盘调度器(SSD 使用 noop 或 deadline)
echo noop > /sys/block/sdb/queue/scheduler

JVM 调优

# kafka-server-start.sh 中的 KAFKA_HEAP_OPTS

# 小集群(吞吐适中)
export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"

# 大规模集群(推荐 G1GC)
export KAFKA_JVM_PERFORMANCE_OPTS="
  -server
  -XX:+UseG1GC
  -XX:MaxGCPauseMillis=20
  -XX:InitiatingHeapOccupancyPercent=35
  -XX:+ExplicitGCInvokesConcurrent
  -Djava.awt.headless=true
  -Xms6g -Xmx6g
"

# GC 日志(用于分析停顿)
export KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100m"

Kafka Broker 调优参数

# server.properties 关键调优参数

# 网络与 I/O 线程数(根据 CPU 核心数和分区数调整)
num.network.threads=8         # 处理网络请求(接收/发送)
num.io.threads=16             # 处理磁盘 I/O
num.replica.fetchers=4       # Follower 同步线程数

# 缓冲区(减少小 I/O)
socket.send.buffer.bytes=1048576    # 1MB
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600 # 100MB

# 副本同步优化
replica.lag.time.max.ms=30000
min.insync.replicas=2

# 日志段(平衡恢复速度和文件数量)
log.segment.bytes=1073741824     # 1GB 段文件
log.roll.hours=168               # 最多 7 天滚动一次(避免大量小段)
log.flush.interval.messages=10000 # 每 10000 条消息 fsync(通常依赖 OS 调度)

SSL 加密配置

# 1. 生成 CA 证书和 Broker 证书(生产建议使用 HashiCorp Vault 管理)
keytool -genkey -keyalg RSA -keystore kafka.broker1.keystore.jks \
  -keysize 2048 -validity 365 -alias broker1 \
  -dname "CN=broker1.kafka.internal,OU=Kafka,O=Example,C=CN"

# 2. server.properties 配置 SSL 监听器
listeners=PLAINTEXT://:9092,SSL://:9093
ssl.keystore.location=/var/ssl/private/kafka.broker1.keystore.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
ssl.truststore.location=/var/ssl/private/kafka.broker1.truststore.jks
ssl.truststore.password=truststore_password
ssl.client.auth=required   # 要求客户端证书认证(双向 TLS)

# 3. 客户端(Python)SSL 配置
producer = Producer({
    'bootstrap.servers': 'broker1:9093',
    'security.protocol': 'SSL',
    'ssl.ca.location': '/var/ssl/ca.crt',
    'ssl.certificate.location': '/var/ssl/client.crt',
    'ssl.key.location': '/var/ssl/client.key',
})

SASL 认证

# SASL/PLAIN 配置(简单用户名密码,需要 SSL 加密传输)

# server.properties
listeners=SASL_SSL://:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# JAAS 配置文件 kafka_server_jaas.conf
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_producer="producer-secret"
  user_consumer="consumer-secret";
};

# 启动时加载 JAAS
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

# SCRAM-SHA-256(更安全,密码存储在 ZooKeeper/Kafka 中)
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=secret]' \
  --entity-type users --entity-name producer-app

ACL 权限控制

# 允许 producer-app 向 orders Topic 写入
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:producer-app \
  --operation Write --topic orders

# 允许 consumer-group-app 从 orders Topic 读取
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:consumer-app \
  --operation Read --topic orders

# 允许消费者使用消费组
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:consumer-app \
  --operation Read --group order-processor-v2

# 查看 Topic 的 ACL
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
  --list --topic orders

# 删除 ACL
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
  --remove --allow-principal User:consumer-app \
  --operation Read --topic orders

磁盘容量规划

# 磁盘容量估算公式:
# 所需磁盘 = 写入速率 × 保留时间 × 副本因子 / 压缩比

# 示例:
# 写入速率: 100 MB/s
# 保留时间: 7天 = 604800秒
# 副本因子: 3
# 压缩比: 2.5x(LZ4 压缩 JSON)
# → 所需磁盘 = 100 × 604800 × 3 / 2.5 ≈ 72,576 GB ≈ 72 TB(3节点各约24TB)

# Kafka 磁盘使用量查看
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 \
  --topic-list orders,users \
  --describe | python3 -m json.tool

# 添加新 Broker 后的分区重新平衡
# 1. 生成重新平衡计划
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate

# 2. 执行重新平衡
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassignment-plan.json \
  --execute

# 3. 查看进度
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassignment-plan.json \
  --verify

监控告警优先级:最优先关注 UnderReplicatedPartitions(大于 0 说明副本不健康)和 Consumer Lag(大于阈值说明消费积压)。这两个指标直接影响系统可用性和数据时效性,需要配置即时告警(PagerDuty 或企业微信机器人)。