Kafka Connect 框架概览
Kafka Connect 是 Kafka 内置的可扩展数据集成框架,通过配置而非编码实现各种数据源与 Kafka 之间的数据流转。
Kafka Connect 数据流架构
┌─────────────┐ ┌──────────────────────┐ ┌─────────────────┐
│ MySQL DB │────▶│ Source Connector │────▶│ │
│ PostgreSQL │────▶│ (Debezium CDC) │────▶│ │
│ MongoDB │────▶│ │────▶│ Kafka Cluster │
│ S3 Files │────▶│ JDBC Source │────▶│ │
└─────────────┘ └──────────────────────┘ │ │
│ │
┌─────────────┐ ┌──────────────────────┐ │ │
│Elasticsearch│◀────│ Sink Connector │◀────│ │
│ S3/GCS │◀────│ │◀────│ │
│ HDFS │◀────│ JDBC Sink │◀────│ │
│ ClickHouse │◀────│ │◀────│ │
└─────────────┘ └──────────────────────┘ └─────────────────┘
部署模式
| 模式 | 配置 | 适用场景 |
|---|---|---|
| Standalone | 单进程,配置文件驱动 | 开发测试、单机小量数据 |
| Distributed | 多节点,REST API 管理,自动容错 | 生产环境,高可用 |
# 启动 Standalone 模式(开发调试)
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connect-file-source.properties
# 启动 Distributed 模式
bin/connect-distributed.sh config/connect-distributed.properties
# connect-distributed.properties 核心配置
bootstrap.servers=localhost:9092
group.id=connect-cluster # Connect 集群组 ID
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.port=8083 # REST API 端口
plugin.path=/opt/kafka/plugins # Connector JAR 目录
常用 Connector 与配置
Debezium MySQL Source Connector(CDC)
Debezium 通过解析 MySQL 的 binlog 实现变更数据捕获(Change Data Capture),将每条 INSERT/UPDATE/DELETE 操作发送到 Kafka Topic。
# 通过 REST API 创建 Connector(JSON 配置)
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "mysql-orders-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz_password",
"database.server.id": "1",
"database.server.name": "mysql-prod",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.orders,ecommerce.users",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql-prod",
"snapshot.mode": "initial",
"include.schema.changes": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}'
Elasticsearch Sink Connector
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "es-orders-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "mysql-prod.ecommerce.orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"batch.size": "1000",
"flush.timeout.ms": "10000",
"max.retries": "5",
"retry.backoff.ms": "100",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id"
}
}'
S3 Sink Connector(数据归档)
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "s3-orders-archive",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "2",
"topics": "orders",
"s3.region": "cn-north-1",
"s3.bucket.name": "kafka-archive",
"s3.part.size": "67108864",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"locale": "zh_CN",
"timezone": "Asia/Shanghai",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at"
}
}'
REST API 管理
# 列出所有 Connector
curl http://localhost:8083/connectors
# 查看 Connector 状态
curl http://localhost:8083/connectors/mysql-orders-cdc/status
# 暂停 Connector(不删除)
curl -X PUT http://localhost:8083/connectors/mysql-orders-cdc/pause
# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/mysql-orders-cdc/resume
# 重启失败的 Task
curl -X POST http://localhost:8083/connectors/mysql-orders-cdc/tasks/0/restart
# 删除 Connector
curl -X DELETE http://localhost:8083/connectors/mysql-orders-cdc
# 查看可用的 Connector 插件
curl http://localhost:8083/connector-plugins | python3 -m json.tool
SMT(Single Message Transform)消息转换
SMT 允许在 Connector 内对消息进行轻量转换,无需编写额外的 Streams 应用。
// 常用 SMT 配置示例
// 1. 添加静态字段(标记数据来源)
"transforms": "addSource",
"transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSource.static.field": "data_source",
"transforms.addSource.static.value": "mysql-prod"
// 2. 过滤特定字段(移除敏感信息)
"transforms": "dropPassword",
"transforms.dropPassword.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropPassword.blacklist": "password,credit_card"
// 3. 字段重命名
"transforms": "renameFields",
"transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameFields.renames": "user_id:userId,created_at:createdAt"
// 4. 路由到不同 Topic(按字段值)
"transforms": "routeByRegion",
"transforms.routeByRegion.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByRegion.regex": "(.*)",
"transforms.routeByRegion.replacement": "$1-cn"
实战:MySQL → Kafka → Elasticsearch 全链路
完整数据管道
MySQL(binlog) ──Debezium──▶ mysql-prod.orders Topic
↓ (SMT unwrap: 提取新值, 去除 CDC 元数据)
{ id, user_id, amount, status, created_at }
↓
Elasticsearch Sink ──▶ orders 索引(支持全文搜索、聚合分析)
↓
S3 Sink ──▶ s3://kafka-archive/orders/year=2024/month=04/ (Parquet 格式)
Debezium 需要的 MySQL 配置:需要开启 binlog(log_bin=ON,格式 binlog_format=ROW),并为 Debezium 用户授予 REPLICATION SLAVE, REPLICATION CLIENT, SELECT 权限。建议使用专用的 Debezium 用户,不使用 root。