Chapter 08

Schema Registry 与消息格式

用 Avro 与 Schema Registry 管理消息格式演进——消除生产者消费者版本耦合

为什么需要 Schema Registry

当使用 JSON 格式传输消息时,消费者必须"猜测"消息结构——字段名、类型是否变化、新增字段是否向后兼容,都缺乏约束。一旦生产者修改了字段名,消费者就会静默失败。

Schema Registry 提供了一个集中式的消息格式管理服务

Schema Registry 工作流 Producer Schema Registry Consumer │ │ │ │──POST /subjects/orders──▶│ │ │◀──── schema_id=5 ───────│ │ │ │ │ │ Message = [0x00][schema_id=5][avro_bytes] │ │─────────────────────────────────────────────────▶│ │ │◀──GET /schemas/ids/5──│ │ │──── schema json ──────▶│ │ │ Consumer 反序列化 │

部署 Schema Registry(Docker)

# docker-compose.yml 追加 Schema Registry 服务
  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

# 验证部署
curl http://localhost:8081/subjects   # 列出所有 Subject

Avro 格式

Apache Avro 是 Hadoop 生态中广泛使用的数据序列化系统,具有紧凑的二进制编码和丰富的 Schema 支持。

// orders-value.avsc(Avro Schema 文件)
{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.ecommerce",
  "fields": [
    {"name": "id",         "type": "string"},
    {"name": "userId",     "type": "string"},
    {"name": "amount",     "type": "double"},
    {"name": "currency",   "type": "string", "default": "CNY"},
    {"name": "status",     "type": {
      "type": "enum",
      "name": "OrderStatus",
      "symbols": ["PENDING", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
    }},
    {"name": "createdAt",  "type": "long", "logicalType": "timestamp-millis"},
    // 可选字段(null 联合类型)
    {"name": "couponCode", "type": ["null", "string"], "default": null}
  ]
}

Python 客户端集成(confluent-kafka-python)

from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
import json

# ── Schema Registry 客户端 ──
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})

# 读取 Avro Schema 文件
with open('orders-value.avsc') as f:
    order_schema = f.read()

# 创建序列化器(自动注册 Schema)
avro_serializer = AvroSerializer(
    schema_registry_client,
    order_schema,
    lambda order, ctx: {  # Python dict → Avro
        'id': order.'id',
        'userId': order.'user_id',
        'amount': order.'amount',
        'currency': order.get('currency', 'CNY'),
        'status': order.'status',
        'createdAt': int(order.'created_at' * 1000),
        'couponCode': order.get('coupon_code'),
    }
)

# ── Avro Producer ──
producer = Producer({'bootstrap.servers': 'localhost:9092'})

order_data = {
    'id': 'order-001', 'user_id': 'u-100',
    'amount': 199.5, 'status': 'PENDING',
    'created_at': 1712500000.0
}

producer.produce(
    topic='orders',
    value=avro_serializer(order_data, SerializationContext('orders', MessageField.VALUE)),
    key=StringSerializer('utf_8')(order_data['id'], None)
)
producer.flush()

# ── Avro Consumer ──
avro_deserializer = AvroDeserializer(schema_registry_client)

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'avro-consumer',
    'auto.offset.reset': 'earliest',
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is None or msg.error(): continue
    order = avro_deserializer(msg.value(), SerializationContext('orders', MessageField.VALUE))
    print(f'订单: {order}')

Schema 兼容性规则

Schema Registry 强制执行兼容性检查,防止不兼容的 Schema 变更破坏现有消费者。

兼容性类型允许的变更典型场景
BACKWARD(默认)新 Consumer 可以读取旧数据(新增有默认值的字段、删除字段)先升级 Consumer,后升级 Producer
FORWARD旧 Consumer 可以读取新数据(新增字段、删除有默认值的字段)先升级 Producer,后升级 Consumer
FULL同时满足 BACKWARD 和 FORWARD最严格,推荐生产环境
NONE不检查兼容性完全重写 Schema 时使用
# 设置 Subject 的兼容性级别
curl -X PUT http://localhost:8081/config/orders-value \
  -H 'Content-Type: application/json' \
  -d '{"compatibility": "FULL"}'

# 测试 Schema 是否兼容(返回 {"is_compatible":true/false})
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
  -H 'Content-Type: application/json' \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[...]}"
  }'

# 查看某 Subject 所有版本
curl http://localhost:8081/subjects/orders-value/versions

# 获取指定版本的 Schema
curl http://localhost:8081/subjects/orders-value/versions/1

Protobuf 格式(与 gRPC 体系融合)

// orders.proto
syntax = "proto3";

package com.example.ecommerce;

enum OrderStatus {
  PENDING   = 0;
  PAID      = 1;
  SHIPPED   = 2;
  DELIVERED = 3;
  CANCELLED = 4;
}

message Order {
  string       id          = 1;
  string       user_id     = 2;
  double       amount      = 3;
  string       currency    = 4;
  OrderStatus  status      = 5;
  int64        created_at  = 6;
  optional string coupon_code = 7;
}

// Python Protobuf Producer
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
import orders_pb2

proto_serializer = ProtobufSerializer(
    orders_pb2.Order, schema_registry_client
)

order_proto = orders_pb2.Order(
    id="order-002",
    user_id="u-200",
    amount=88.0,
    currency="CNY",
    status=orders_pb2.Order.PAID,
    created_at=1712500000000
)

producer.produce(
    topic="orders-proto",
    value=proto_serializer(order_proto, SerializationContext("orders-proto", MessageField.VALUE))
)

JSON Schema 也支持:Schema Registry 还支持 JSON Schema(结合 JSON 格式的 Schema 校验),兼顾人类可读性和格式约束。适合调试期或需要与不支持 Avro 的系统互操作的场景。