为什么需要 Schema Registry
当使用 JSON 格式传输消息时,消费者必须"猜测"消息结构——字段名、类型是否变化、新增字段是否向后兼容,都缺乏约束。一旦生产者修改了字段名,消费者就会静默失败。
Schema Registry 提供了一个集中式的消息格式管理服务:
- 生产者注册 Schema,获得 Schema ID(整数)
- 消息中只携带 Schema ID(5字节头部)+ 序列化后的数据
- 消费者通过 Schema ID 查询 Schema 并反序列化
- Registry 强制检查新版本 Schema 是否符合兼容性规则
Schema Registry 工作流
Producer Schema Registry Consumer
│ │ │
│──POST /subjects/orders──▶│ │
│◀──── schema_id=5 ───────│ │
│ │ │
│ Message = [0x00][schema_id=5][avro_bytes] │
│─────────────────────────────────────────────────▶│
│ │◀──GET /schemas/ids/5──│
│ │──── schema json ──────▶│
│ │ Consumer 反序列化 │
部署 Schema Registry(Docker)
# docker-compose.yml 追加 Schema Registry 服务
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
# 验证部署
curl http://localhost:8081/subjects # 列出所有 Subject
Avro 格式
Apache Avro 是 Hadoop 生态中广泛使用的数据序列化系统,具有紧凑的二进制编码和丰富的 Schema 支持。
// orders-value.avsc(Avro Schema 文件)
{
"type": "record",
"name": "Order",
"namespace": "com.example.ecommerce",
"fields": [
{"name": "id", "type": "string"},
{"name": "userId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "CNY"},
{"name": "status", "type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
}},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"},
// 可选字段(null 联合类型)
{"name": "couponCode", "type": ["null", "string"], "default": null}
]
}
Python 客户端集成(confluent-kafka-python)
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
import json
# ── Schema Registry 客户端 ──
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
# 读取 Avro Schema 文件
with open('orders-value.avsc') as f:
order_schema = f.read()
# 创建序列化器(自动注册 Schema)
avro_serializer = AvroSerializer(
schema_registry_client,
order_schema,
lambda order, ctx: { # Python dict → Avro
'id': order.'id',
'userId': order.'user_id',
'amount': order.'amount',
'currency': order.get('currency', 'CNY'),
'status': order.'status',
'createdAt': int(order.'created_at' * 1000),
'couponCode': order.get('coupon_code'),
}
)
# ── Avro Producer ──
producer = Producer({'bootstrap.servers': 'localhost:9092'})
order_data = {
'id': 'order-001', 'user_id': 'u-100',
'amount': 199.5, 'status': 'PENDING',
'created_at': 1712500000.0
}
producer.produce(
topic='orders',
value=avro_serializer(order_data, SerializationContext('orders', MessageField.VALUE)),
key=StringSerializer('utf_8')(order_data['id'], None)
)
producer.flush()
# ── Avro Consumer ──
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'avro-consumer',
'auto.offset.reset': 'earliest',
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is None or msg.error(): continue
order = avro_deserializer(msg.value(), SerializationContext('orders', MessageField.VALUE))
print(f'订单: {order}')
Schema 兼容性规则
Schema Registry 强制执行兼容性检查,防止不兼容的 Schema 变更破坏现有消费者。
| 兼容性类型 | 允许的变更 | 典型场景 |
|---|---|---|
| BACKWARD(默认) | 新 Consumer 可以读取旧数据(新增有默认值的字段、删除字段) | 先升级 Consumer,后升级 Producer |
| FORWARD | 旧 Consumer 可以读取新数据(新增字段、删除有默认值的字段) | 先升级 Producer,后升级 Consumer |
| FULL | 同时满足 BACKWARD 和 FORWARD | 最严格,推荐生产环境 |
| NONE | 不检查兼容性 | 完全重写 Schema 时使用 |
# 设置 Subject 的兼容性级别
curl -X PUT http://localhost:8081/config/orders-value \
-H 'Content-Type: application/json' \
-d '{"compatibility": "FULL"}'
# 测试 Schema 是否兼容(返回 {"is_compatible":true/false})
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
-H 'Content-Type: application/json' \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[...]}"
}'
# 查看某 Subject 所有版本
curl http://localhost:8081/subjects/orders-value/versions
# 获取指定版本的 Schema
curl http://localhost:8081/subjects/orders-value/versions/1
Protobuf 格式(与 gRPC 体系融合)
// orders.proto
syntax = "proto3";
package com.example.ecommerce;
enum OrderStatus {
PENDING = 0;
PAID = 1;
SHIPPED = 2;
DELIVERED = 3;
CANCELLED = 4;
}
message Order {
string id = 1;
string user_id = 2;
double amount = 3;
string currency = 4;
OrderStatus status = 5;
int64 created_at = 6;
optional string coupon_code = 7;
}
// Python Protobuf Producer
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
import orders_pb2
proto_serializer = ProtobufSerializer(
orders_pb2.Order, schema_registry_client
)
order_proto = orders_pb2.Order(
id="order-002",
user_id="u-200",
amount=88.0,
currency="CNY",
status=orders_pb2.Order.PAID,
created_at=1712500000000
)
producer.produce(
topic="orders-proto",
value=proto_serializer(order_proto, SerializationContext("orders-proto", MessageField.VALUE))
)
JSON Schema 也支持:Schema Registry 还支持 JSON Schema(结合 JSON 格式的 Schema 校验),兼顾人类可读性和格式约束。适合调试期或需要与不支持 Avro 的系统互操作的场景。