核心 JMX 监控指标
Kafka 通过 JMX(Java Management Extensions)暴露丰富的运行时指标。以下是生产环境必须关注的关键指标:
| 指标名称(JMX MBean) | 含义 | 告警阈值建议 |
|---|---|---|
kafka.server:MessagesInPerSec | 每秒写入消息数 | 监控趋势,突降需告警 |
kafka.server:BytesInPerSec | 每秒写入字节数 | 监控趋势,接近网卡带宽告警 |
kafka.server:UnderReplicatedPartitions | 未完全同步的分区数 | 大于 0 立即告警 |
kafka.server:ActiveControllerCount | 活跃 Controller 数(集群内应为1) | 不等于 1 立即告警 |
kafka.server:RequestQueueSize | 请求队列积压 | 持续大于 0 告警 |
kafka.network:RequestMetrics.LocalTimeMs | 请求在 Leader 上的处理时间 | P99 大于 100ms 告警 |
kafka.log:LogFlushRateAndTimeMs | 日志 fsync 延迟 | P99 大于 1000ms 告警 |
kafka.consumer:consumer-fetch-manager-metrics.records-lag-max | Consumer 最大 Lag | 根据业务 SLA 设置 |
Prometheus JMX Exporter 配置
# 1. 下载 JMX Exporter Agent
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar
# 2. kafka-jmx.yml 配置(使用开源社区 Kafka 模板)
rules:
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
# 3. Kafka 启动脚本中添加 JVM Agent
export KAFKA_JMX_OPTS="-javaagent:/opt/jmx_exporter.jar=9090:/opt/kafka-jmx.yml"
# 验证 Metrics 端点
curl http://localhost:9090/metrics | grep kafka_server
Prometheus 采集配置
# prometheus.yml
scrape_configs:
- job_name: 'kafka-brokers'
static_configs:
- targets:
- 'broker1:9090'
- 'broker2:9090'
- 'broker3:9090'
scrape_interval: 15s
- job_name: 'kafka-consumer-lag'
# 使用 kafka-lag-exporter 单独采集 Consumer Lag
static_configs:
- targets: ['lag-exporter:9999']
三层性能调优
OS 层调优
# /etc/sysctl.conf 调整(需要 root)
# 增大文件描述符上限(每个分区需要多个文件句柄)
fs.file-max = 1000000
# /etc/security/limits.conf
kafka soft nofile 1000000
kafka hard nofile 1000000
# 网络缓冲区(提升吞吐量)
net.core.rmem_max = 134217728 # 128MB
net.core.wmem_max = 134217728
net.ipv4.tcp_rmem = 4096 65536 134217728
net.ipv4.tcp_wmem = 4096 65536 134217728
# 减少 PageCache 脏数据回写延迟
vm.dirty_ratio = 60
vm.dirty_background_ratio = 5
# 禁用 swap(避免 JVM GC 触发 swap 导致延迟抖动)
vm.swappiness = 1
# 磁盘调度器(SSD 使用 noop 或 deadline)
echo noop > /sys/block/sdb/queue/scheduler
JVM 调优
# kafka-server-start.sh 中的 KAFKA_HEAP_OPTS
# 小集群(吞吐适中)
export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
# 大规模集群(推荐 G1GC)
export KAFKA_JVM_PERFORMANCE_OPTS="
-server
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ExplicitGCInvokesConcurrent
-Djava.awt.headless=true
-Xms6g -Xmx6g
"
# GC 日志(用于分析停顿)
export KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100m"
Kafka Broker 调优参数
# server.properties 关键调优参数
# 网络与 I/O 线程数(根据 CPU 核心数和分区数调整)
num.network.threads=8 # 处理网络请求(接收/发送)
num.io.threads=16 # 处理磁盘 I/O
num.replica.fetchers=4 # Follower 同步线程数
# 缓冲区(减少小 I/O)
socket.send.buffer.bytes=1048576 # 1MB
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600 # 100MB
# 副本同步优化
replica.lag.time.max.ms=30000
min.insync.replicas=2
# 日志段(平衡恢复速度和文件数量)
log.segment.bytes=1073741824 # 1GB 段文件
log.roll.hours=168 # 最多 7 天滚动一次(避免大量小段)
log.flush.interval.messages=10000 # 每 10000 条消息 fsync(通常依赖 OS 调度)
SSL 加密配置
# 1. 生成 CA 证书和 Broker 证书(生产建议使用 HashiCorp Vault 管理)
keytool -genkey -keyalg RSA -keystore kafka.broker1.keystore.jks \
-keysize 2048 -validity 365 -alias broker1 \
-dname "CN=broker1.kafka.internal,OU=Kafka,O=Example,C=CN"
# 2. server.properties 配置 SSL 监听器
listeners=PLAINTEXT://:9092,SSL://:9093
ssl.keystore.location=/var/ssl/private/kafka.broker1.keystore.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
ssl.truststore.location=/var/ssl/private/kafka.broker1.truststore.jks
ssl.truststore.password=truststore_password
ssl.client.auth=required # 要求客户端证书认证(双向 TLS)
# 3. 客户端(Python)SSL 配置
producer = Producer({
'bootstrap.servers': 'broker1:9093',
'security.protocol': 'SSL',
'ssl.ca.location': '/var/ssl/ca.crt',
'ssl.certificate.location': '/var/ssl/client.crt',
'ssl.key.location': '/var/ssl/client.key',
})
SASL 认证
# SASL/PLAIN 配置(简单用户名密码,需要 SSL 加密传输)
# server.properties
listeners=SASL_SSL://:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS 配置文件 kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_producer="producer-secret"
user_consumer="consumer-secret";
};
# 启动时加载 JAAS
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
# SCRAM-SHA-256(更安全,密码存储在 ZooKeeper/Kafka 中)
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=secret]' \
--entity-type users --entity-name producer-app
ACL 权限控制
# 允许 producer-app 向 orders Topic 写入
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:producer-app \
--operation Write --topic orders
# 允许 consumer-group-app 从 orders Topic 读取
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:consumer-app \
--operation Read --topic orders
# 允许消费者使用消费组
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:consumer-app \
--operation Read --group order-processor-v2
# 查看 Topic 的 ACL
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--list --topic orders
# 删除 ACL
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--remove --allow-principal User:consumer-app \
--operation Read --topic orders
磁盘容量规划
# 磁盘容量估算公式:
# 所需磁盘 = 写入速率 × 保留时间 × 副本因子 / 压缩比
# 示例:
# 写入速率: 100 MB/s
# 保留时间: 7天 = 604800秒
# 副本因子: 3
# 压缩比: 2.5x(LZ4 压缩 JSON)
# → 所需磁盘 = 100 × 604800 × 3 / 2.5 ≈ 72,576 GB ≈ 72 TB(3节点各约24TB)
# Kafka 磁盘使用量查看
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 \
--topic-list orders,users \
--describe | python3 -m json.tool
# 添加新 Broker 后的分区重新平衡
# 1. 生成重新平衡计划
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" \
--generate
# 2. 执行重新平衡
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassignment-plan.json \
--execute
# 3. 查看进度
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassignment-plan.json \
--verify
监控告警优先级:最优先关注 UnderReplicatedPartitions(大于 0 说明副本不健康)和 Consumer Lag(大于阈值说明消费积压)。这两个指标直接影响系统可用性和数据时效性,需要配置即时告警(PagerDuty 或企业微信机器人)。