Python 完整生产者消费者实战
# requirements.txt
# confluent-kafka==2.3.0
# python-dotenv==1.0.0
import os, json, logging, signal
from dataclasses import dataclass
from datetime import datetime
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
# ── 配置(从环境变量读取)──
KAFKA_BOOTSTRAP = os.getenv('KAFKA_BOOTSTRAP', 'localhost:9092')
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'orders')
KAFKA_GROUP = os.getenv('KAFKA_GROUP', 'order-service')
@dataclass
class OrderEvent:
order_id: str
user_id: str
amount: float
status: str
event_type: str
timestamp: str = None
def to_dict(self) -> dict:
d = self.__dict__.copy()
if not d['timestamp']:
d['timestamp'] = datetime.utcnow().isoformat()
return d
# ── Producer ──
class OrderProducer:
def __init__(self):
self._producer = Producer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'linger.ms': 5,
'batch.size': 65536,
'retries': 10,
'delivery.timeout.ms': 120000,
})
def publish(self, event: OrderEvent) -> None:
def _on_delivery(err, msg):
if err:
logger.error('发送失败: %s, key=%s', err, msg.key())
else:
logger.debug('发送成功: partition=%d, offset=%d',
msg.partition(), msg.offset())
self._producer.produce(
topic=KAFKA_TOPIC,
key=event.order_id.encode(),
value=json.dumps(event.to_dict(), ensure_ascii=False).encode(),
callback=_on_delivery,
headers={
'event-type': event.event_type,
'content-type': 'application/json',
}
)
self._producer.poll(0)
def close(self):
self._producer.flush(timeout=30)
# ── Consumer ──
class OrderConsumer:
def __init__(self, handler):
self._handler = handler
self._running = True
self._consumer = Consumer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'group.id': KAFKA_GROUP,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
})
signal.signal(signal.SIGTERM, lambda *_: self.stop())
signal.signal(signal.SIGINT, lambda *_: self.stop())
def run(self):
self._consumer.subscribe([KAFKA_TOPIC])
logger.info('Consumer 启动,订阅 %s', KAFKA_TOPIC)
try:
while self._running:
msg = self._consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
raise KafkaException(msg.error())
continue
event = json.loads(msg.value())
self._handler(event)
self._consumer.commit(msg, asynchronous=False)
finally:
self._consumer.close()
logger.info('Consumer 已关闭')
def stop(self): self._running = False
Go 高性能消费者(segmentio/kafka-go)
// go.mod: github.com/segmentio/kafka-go v0.4.47
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"syscall"
"time"
kafka "github.com/segmentio/kafka-go"
)
type OrderEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
EventType string `json:"event_type"`
Timestamp string `json:"timestamp"`
}
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "order-processor-go",
Topic: "orders",
MinBytes: 1e3, // 1KB
MaxBytes: 10e6, // 10MB
MaxWait: 500 * time.Millisecond,
CommitInterval: 1 * time.Second, // 自动提交间隔
StartOffset: kafka.FirstOffset,
})
defer reader.Close()
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer cancel()
log.Println("Go Consumer 启动...")
for {
// FetchMessage 不自动提交 Offset
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
log.Println("收到退出信号,关闭 Consumer")
return
}
log.Printf("读取消息失败: %v", err)
continue
}
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("解析消息失败: %v, raw=%s", err, msg.Value)
} else {
processOrder(event)
}
// 手动提交 Offset
if err := reader.CommitMessages(ctx, msg); err != nil {
log.Printf("提交 Offset 失败: %v", err)
}
}
}
func processOrder(event OrderEvent) {
log.Printf("处理订单: id=%s, amount=%.2f, status=%s",
event.OrderID, event.Amount, event.Status)
}
延迟队列实现(多级 Topic 方案)
Kafka 原生不支持延迟消息,生产中通过多个延迟级别的 Topic 模拟:
多级延迟 Topic 架构
Producer ──▶ delay-10s ──▶ Delay Worker ──▶ orders
(10秒延迟) (消费后检查时间,
到期才转发到目标Topic)
Producer ──▶ delay-60s ──▶ Delay Worker ──▶ orders
Producer ──▶ delay-300s ──▶ Delay Worker ──▶ orders
import time, json
from confluent_kafka import Producer, Consumer
DELAY_LEVELS = {10: 'delay-10s', 60: 'delay-60s', 300: 'delay-300s'}
def send_delayed(producer, target_topic: str, key: str, value: dict, delay_seconds: int):
"""发送延迟消息"""
# 找最接近的延迟级别
level = min(DELAY_LEVELS.keys(), key=lambda x: abs(x - delay_seconds))
delay_topic = DELAY_LEVELS[level]
value['_deliver_at'] = time.time() + delay_seconds
value['_target_topic'] = target_topic
producer.produce(topic=delay_topic, key=key, value=json.dumps(value))
producer.poll(0)
def delay_worker(delay_topic: str, forward_producer):
"""延迟工作者:消费延迟 Topic,到期才转发"""
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': f'delay-worker-{delay_topic}',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
})
consumer.subscribe([delay_topic])
while True:
msg = consumer.poll(0.1)
if not msg or msg.error(): continue
data = json.loads(msg.value())
deliver_at = data.pop('_deliver_at')
target_topic = data.pop('_target_topic')
remaining = deliver_at - time.time()
if remaining > 0:
time.sleep(min(remaining, 1.0)) # 最多等 1 秒再检查
# 将消息重新入队(保持重试)
consumer.seek(msg.topic_partition(), msg.offset())
continue
# 时间到:转发到目标 Topic
forward_producer.produce(
topic=target_topic,
key=msg.key(),
value=json.dumps(data)
)
consumer.commit(msg)
死信队列(Dead Letter Queue)
"""DLQ 模式:处理失败超过重试次数后,消息转移到 DLQ Topic"""
DLQ_TOPIC = 'orders-dlq'
MAX_RETRIES = 3
def process_with_dlq(consumer, dlq_producer, handler):
retry_counts = {} # { (topic, partition, offset): retry_count }
while True:
msg = consumer.poll(1.0)
if not msg or msg.error(): continue
msg_id = (msg.topic(), msg.partition(), msg.offset())
retries = retry_counts.get(msg_id, 0)
try:
data = json.loads(msg.value())
handler(data)
retry_counts.pop(msg_id, None) # 成功后清除计数
consumer.commit(msg)
except Exception as e:
if retries >= MAX_RETRIES:
# 超过重试次数,发送到 DLQ
dlq_payload = {
'original_topic': msg.topic(),
'original_partition': msg.partition(),
'original_offset': msg.offset(),
'error': str(e),
'failed_at': datetime.utcnow().isoformat(),
'payload': msg.value().decode(),
}
dlq_producer.produce(
topic=DLQ_TOPIC,
key=msg.key(),
value=json.dumps(dlq_payload)
)
retry_counts.pop(msg_id, None)
consumer.commit(msg)
logger.error('消息进入 DLQ: %s', msg_id)
else:
retry_counts[msg_id] = retries + 1
logger.warning('处理失败,第%d次重试: %s', retries + 1, e)
Saga 模式与事件溯源
Kafka 驱动的 Saga 编排模式(订单支付流程)
OrderService ──order.created──▶ Kafka
──▶ PaymentService ──payment.succeeded──▶ Kafka
──▶ InventoryService ──stock.deducted──▶ Kafka
──▶ OrderService
(更新状态为 SHIPPED)
若 payment.failed → OrderService 接收并回滚订单状态(补偿事务)
与 Flink / Spark Streaming 集成概述
| 框架 | Kafka 集成方式 | 适用场景 |
|---|---|---|
| Apache Flink | FlinkKafkaConsumer / KafkaSource(官方连接器) | 复杂流处理、CEP、精确一次 + 状态容错 |
| Spark Streaming | Direct API(直接读取 Offset,不经过 ZK) | 批流一体、大规模机器学习 |
| Kafka Streams | 内置(本教程第6章) | 轻量流处理,无需独立集群 |
// Flink + Kafka Source 示例(Java)
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("orders")
.setGroupId("flink-consumer")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(
source, WatermarkStrategy.noWatermarks(), "kafka-source"
);
选型建议:何时用 Kafka
| 需求 | 推荐方案 | 理由 |
|---|---|---|
| 简单任务队列,低吞吐 | RabbitMQ / Redis Lists | 运维简单,延迟更低,功能够用 |
| 高吞吐消息传递(>10万/s) | Kafka | 顺序写磁盘、PageCache 优势明显 |
| 消息需要回溯重放 | Kafka | Commit Log 持久化,支持任意 Offset 回溯 |
| CDC / 数据管道 | Kafka + Debezium | 与 Kafka Connect 生态天然集成 |
| 实时流处理+状态计算 | Kafka Streams / Flink | 内置状态管理和容错 |
| 微服务解耦事件总线 | Kafka | 多消费者独立消费,完整事件历史 |
| 有序延迟消息(精确) | RocketMQ | 原生支持 18 种延迟级别 |
| 事务消息(分布式事务) | RocketMQ / Kafka 事务 API | RocketMQ 事务消息更成熟易用 |
Kafka 最甜蜜点:当你的系统需要高吞吐 + 消息持久化 + 多消费者独立处理 + 消息回溯这四个特性中的任意两个以上时,Kafka 通常是最佳选择。对于简单的任务队列或延迟任务,Redis 或 RabbitMQ 的运维成本更低,是更务实的选择。
生产检查清单
- Topic 分区数已根据吞吐量估算,副本因子 >= 3
min.insync.replicas=2,acks=all已配置- Producer 开启幂等(
enable.idempotence=true),Consumer 手动提交 Offset - 消息格式使用 Schema Registry + Avro/Protobuf
- UnderReplicatedPartitions、Consumer Lag 告警已就位
- Prometheus + Grafana 仪表盘已部署
- SASL 认证 + SSL 加密 + ACL 授权已配置
- 死信队列 Topic 已创建,监控告警已就位
- 磁盘容量已规划,15-30% 余量留存
- JVM GC 日志已开启,定期分析 STW 停顿