Chapter 10

生产实战与最佳实践

Python/Go 完整代码 · 延迟队列 · 死信队列 · Saga · Event Sourcing · 选型决策

Python 完整生产者消费者实战

# requirements.txt
# confluent-kafka==2.3.0
# python-dotenv==1.0.0

import os, json, logging, signal
from dataclasses import dataclass
from datetime import datetime
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)

# ── 配置(从环境变量读取)──
KAFKA_BOOTSTRAP = os.getenv('KAFKA_BOOTSTRAP', 'localhost:9092')
KAFKA_TOPIC     = os.getenv('KAFKA_TOPIC', 'orders')
KAFKA_GROUP     = os.getenv('KAFKA_GROUP', 'order-service')

@dataclass
class OrderEvent:
    order_id: str
    user_id: str
    amount: float
    status: str
    event_type: str
    timestamp: str = None

    def to_dict(self) -> dict:
        d = self.__dict__.copy()
        if not d['timestamp']:
            d['timestamp'] = datetime.utcnow().isoformat()
        return d

# ── Producer ──
class OrderProducer:
    def __init__(self):
        self._producer = Producer({
            'bootstrap.servers': KAFKA_BOOTSTRAP,
            'acks': 'all',
            'enable.idempotence': True,
            'compression.type': 'lz4',
            'linger.ms': 5,
            'batch.size': 65536,
            'retries': 10,
            'delivery.timeout.ms': 120000,
        })

    def publish(self, event: OrderEvent) -> None:
        def _on_delivery(err, msg):
            if err:
                logger.error('发送失败: %s, key=%s', err, msg.key())
            else:
                logger.debug('发送成功: partition=%d, offset=%d',
                              msg.partition(), msg.offset())

        self._producer.produce(
            topic=KAFKA_TOPIC,
            key=event.order_id.encode(),
            value=json.dumps(event.to_dict(), ensure_ascii=False).encode(),
            callback=_on_delivery,
            headers={
                'event-type': event.event_type,
                'content-type': 'application/json',
            }
        )
        self._producer.poll(0)

    def close(self):
        self._producer.flush(timeout=30)

# ── Consumer ──
class OrderConsumer:
    def __init__(self, handler):
        self._handler = handler
        self._running = True
        self._consumer = Consumer({
            'bootstrap.servers': KAFKA_BOOTSTRAP,
            'group.id': KAFKA_GROUP,
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,
            'max.poll.interval.ms': 300000,
        })
        signal.signal(signal.SIGTERM, lambda *_: self.stop())
        signal.signal(signal.SIGINT,  lambda *_: self.stop())

    def run(self):
        self._consumer.subscribe([KAFKA_TOPIC])
        logger.info('Consumer 启动,订阅 %s', KAFKA_TOPIC)
        try:
            while self._running:
                msg = self._consumer.poll(timeout=1.0)
                if msg is None: continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        raise KafkaException(msg.error())
                    continue
                event = json.loads(msg.value())
                self._handler(event)
                self._consumer.commit(msg, asynchronous=False)
        finally:
            self._consumer.close()
            logger.info('Consumer 已关闭')

    def stop(self): self._running = False

Go 高性能消费者(segmentio/kafka-go)

// go.mod: github.com/segmentio/kafka-go v0.4.47

package main

import (
    "context"
    "encoding/json"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    kafka "github.com/segmentio/kafka-go"
)

type OrderEvent struct {
    OrderID   string  `json:"order_id"`
    UserID    string  `json:"user_id"`
    Amount    float64 `json:"amount"`
    Status    string  `json:"status"`
    EventType string  `json:"event_type"`
    Timestamp string  `json:"timestamp"`
}

func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        []string{"localhost:9092"},
        GroupID:        "order-processor-go",
        Topic:          "orders",
        MinBytes:       1e3,   // 1KB
        MaxBytes:       10e6,  // 10MB
        MaxWait:        500 * time.Millisecond,
        CommitInterval: 1 * time.Second,  // 自动提交间隔
        StartOffset:    kafka.FirstOffset,
    })
    defer reader.Close()

    ctx, cancel := signal.NotifyContext(context.Background(),
        syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    log.Println("Go Consumer 启动...")

    for {
        // FetchMessage 不自动提交 Offset
        msg, err := reader.FetchMessage(ctx)
        if err != nil {
            if ctx.Err() != nil {
                log.Println("收到退出信号,关闭 Consumer")
                return
            }
            log.Printf("读取消息失败: %v", err)
            continue
        }

        var event OrderEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("解析消息失败: %v, raw=%s", err, msg.Value)
        } else {
            processOrder(event)
        }

        // 手动提交 Offset
        if err := reader.CommitMessages(ctx, msg); err != nil {
            log.Printf("提交 Offset 失败: %v", err)
        }
    }
}

func processOrder(event OrderEvent) {
    log.Printf("处理订单: id=%s, amount=%.2f, status=%s",
        event.OrderID, event.Amount, event.Status)
}

延迟队列实现(多级 Topic 方案)

Kafka 原生不支持延迟消息,生产中通过多个延迟级别的 Topic 模拟:

多级延迟 Topic 架构 Producer ──▶ delay-10s ──▶ Delay Worker ──▶ orders (10秒延迟) (消费后检查时间, 到期才转发到目标Topic) Producer ──▶ delay-60s ──▶ Delay Worker ──▶ orders Producer ──▶ delay-300s ──▶ Delay Worker ──▶ orders
import time, json
from confluent_kafka import Producer, Consumer

DELAY_LEVELS = {10: 'delay-10s', 60: 'delay-60s', 300: 'delay-300s'}

def send_delayed(producer, target_topic: str, key: str, value: dict, delay_seconds: int):
    """发送延迟消息"""
    # 找最接近的延迟级别
    level = min(DELAY_LEVELS.keys(), key=lambda x: abs(x - delay_seconds))
    delay_topic = DELAY_LEVELS[level]

    value['_deliver_at'] = time.time() + delay_seconds
    value['_target_topic'] = target_topic

    producer.produce(topic=delay_topic, key=key, value=json.dumps(value))
    producer.poll(0)

def delay_worker(delay_topic: str, forward_producer):
    """延迟工作者:消费延迟 Topic,到期才转发"""
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': f'delay-worker-{delay_topic}',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
    })
    consumer.subscribe([delay_topic])

    while True:
        msg = consumer.poll(0.1)
        if not msg or msg.error(): continue

        data = json.loads(msg.value())
        deliver_at = data.pop('_deliver_at')
        target_topic = data.pop('_target_topic')

        remaining = deliver_at - time.time()
        if remaining > 0:
            time.sleep(min(remaining, 1.0))  # 最多等 1 秒再检查
            # 将消息重新入队(保持重试)
            consumer.seek(msg.topic_partition(), msg.offset())
            continue

        # 时间到:转发到目标 Topic
        forward_producer.produce(
            topic=target_topic,
            key=msg.key(),
            value=json.dumps(data)
        )
        consumer.commit(msg)

死信队列(Dead Letter Queue)

"""DLQ 模式:处理失败超过重试次数后,消息转移到 DLQ Topic"""

DLQ_TOPIC = 'orders-dlq'
MAX_RETRIES = 3

def process_with_dlq(consumer, dlq_producer, handler):
    retry_counts = {}  # { (topic, partition, offset): retry_count }

    while True:
        msg = consumer.poll(1.0)
        if not msg or msg.error(): continue

        msg_id = (msg.topic(), msg.partition(), msg.offset())
        retries = retry_counts.get(msg_id, 0)

        try:
            data = json.loads(msg.value())
            handler(data)
            retry_counts.pop(msg_id, None)  # 成功后清除计数
            consumer.commit(msg)

        except Exception as e:
            if retries >= MAX_RETRIES:
                # 超过重试次数,发送到 DLQ
                dlq_payload = {
                    'original_topic': msg.topic(),
                    'original_partition': msg.partition(),
                    'original_offset': msg.offset(),
                    'error': str(e),
                    'failed_at': datetime.utcnow().isoformat(),
                    'payload': msg.value().decode(),
                }
                dlq_producer.produce(
                    topic=DLQ_TOPIC,
                    key=msg.key(),
                    value=json.dumps(dlq_payload)
                )
                retry_counts.pop(msg_id, None)
                consumer.commit(msg)
                logger.error('消息进入 DLQ: %s', msg_id)
            else:
                retry_counts[msg_id] = retries + 1
                logger.warning('处理失败,第%d次重试: %s', retries + 1, e)

Saga 模式与事件溯源

Kafka 驱动的 Saga 编排模式(订单支付流程) OrderService ──order.created──▶ Kafka ──▶ PaymentService ──payment.succeeded──▶ Kafka ──▶ InventoryService ──stock.deducted──▶ Kafka ──▶ OrderService (更新状态为 SHIPPED) 若 payment.failed → OrderService 接收并回滚订单状态(补偿事务)

与 Flink / Spark Streaming 集成概述

框架Kafka 集成方式适用场景
Apache FlinkFlinkKafkaConsumer / KafkaSource(官方连接器)复杂流处理、CEP、精确一次 + 状态容错
Spark StreamingDirect API(直接读取 Offset,不经过 ZK)批流一体、大规模机器学习
Kafka Streams内置(本教程第6章)轻量流处理,无需独立集群
// Flink + Kafka Source 示例(Java)
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("orders")
    .setGroupId("flink-consumer")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    source, WatermarkStrategy.noWatermarks(), "kafka-source"
);

选型建议:何时用 Kafka

需求推荐方案理由
简单任务队列,低吞吐RabbitMQ / Redis Lists运维简单,延迟更低,功能够用
高吞吐消息传递(>10万/s)Kafka顺序写磁盘、PageCache 优势明显
消息需要回溯重放KafkaCommit Log 持久化,支持任意 Offset 回溯
CDC / 数据管道Kafka + Debezium与 Kafka Connect 生态天然集成
实时流处理+状态计算Kafka Streams / Flink内置状态管理和容错
微服务解耦事件总线Kafka多消费者独立消费,完整事件历史
有序延迟消息(精确)RocketMQ原生支持 18 种延迟级别
事务消息(分布式事务)RocketMQ / Kafka 事务 APIRocketMQ 事务消息更成熟易用

Kafka 最甜蜜点:当你的系统需要高吞吐 + 消息持久化 + 多消费者独立处理 + 消息回溯这四个特性中的任意两个以上时,Kafka 通常是最佳选择。对于简单的任务队列或延迟任务,Redis 或 RabbitMQ 的运维成本更低,是更务实的选择。

生产检查清单