Chapter 07

Kafka Connect

无代码数据集成框架——构建 MySQL → Kafka → Elasticsearch 全链路数据管道

Kafka Connect 框架概览

Kafka Connect 是 Kafka 内置的可扩展数据集成框架,通过配置而非编码实现各种数据源与 Kafka 之间的数据流转。

Kafka Connect 数据流架构 ┌─────────────┐ ┌──────────────────────┐ ┌─────────────────┐ │ MySQL DB │────▶│ Source Connector │────▶│ │ │ PostgreSQL │────▶│ (Debezium CDC) │────▶│ │ │ MongoDB │────▶│ │────▶│ Kafka Cluster │ │ S3 Files │────▶│ JDBC Source │────▶│ │ └─────────────┘ └──────────────────────┘ │ │ │ │ ┌─────────────┐ ┌──────────────────────┐ │ │ │Elasticsearch│◀────│ Sink Connector │◀────│ │ │ S3/GCS │◀────│ │◀────│ │ │ HDFS │◀────│ JDBC Sink │◀────│ │ │ ClickHouse │◀────│ │◀────│ │ └─────────────┘ └──────────────────────┘ └─────────────────┘

部署模式

模式配置适用场景
Standalone单进程,配置文件驱动开发测试、单机小量数据
Distributed多节点,REST API 管理,自动容错生产环境,高可用
# 启动 Standalone 模式(开发调试)
bin/connect-standalone.sh \
  config/connect-standalone.properties \
  config/connect-file-source.properties

# 启动 Distributed 模式
bin/connect-distributed.sh config/connect-distributed.properties

# connect-distributed.properties 核心配置
bootstrap.servers=localhost:9092
group.id=connect-cluster          # Connect 集群组 ID
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.port=8083                    # REST API 端口
plugin.path=/opt/kafka/plugins   # Connector JAR 目录

常用 Connector 与配置

Debezium MySQL Source Connector(CDC)

Debezium 通过解析 MySQL 的 binlog 实现变更数据捕获(Change Data Capture),将每条 INSERT/UPDATE/DELETE 操作发送到 Kafka Topic。

# 通过 REST API 创建 Connector(JSON 配置)
curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "mysql-orders-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.server.id": "1",
    "database.server.name": "mysql-prod",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.orders,ecommerce.users",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.mysql-prod",
    "snapshot.mode": "initial",
    "include.schema.changes": "true",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}'

Elasticsearch Sink Connector

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "es-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "3",
    "topics": "mysql-prod.ecommerce.orders",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "batch.size": "1000",
    "flush.timeout.ms": "10000",
    "max.retries": "5",
    "retry.backoff.ms": "100",
    "transforms": "extractKey",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "id"
  }
}'

S3 Sink Connector(数据归档)

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "s3-orders-archive",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "2",
    "topics": "orders",
    "s3.region": "cn-north-1",
    "s3.bucket.name": "kafka-archive",
    "s3.part.size": "67108864",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "flush.size": "10000",
    "rotate.interval.ms": "3600000",
    "locale": "zh_CN",
    "timezone": "Asia/Shanghai",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at"
  }
}'

REST API 管理

# 列出所有 Connector
curl http://localhost:8083/connectors

# 查看 Connector 状态
curl http://localhost:8083/connectors/mysql-orders-cdc/status

# 暂停 Connector(不删除)
curl -X PUT http://localhost:8083/connectors/mysql-orders-cdc/pause

# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/mysql-orders-cdc/resume

# 重启失败的 Task
curl -X POST http://localhost:8083/connectors/mysql-orders-cdc/tasks/0/restart

# 删除 Connector
curl -X DELETE http://localhost:8083/connectors/mysql-orders-cdc

# 查看可用的 Connector 插件
curl http://localhost:8083/connector-plugins | python3 -m json.tool

SMT(Single Message Transform)消息转换

SMT 允许在 Connector 内对消息进行轻量转换,无需编写额外的 Streams 应用。

// 常用 SMT 配置示例

// 1. 添加静态字段(标记数据来源)
"transforms": "addSource",
"transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSource.static.field": "data_source",
"transforms.addSource.static.value": "mysql-prod"

// 2. 过滤特定字段(移除敏感信息)
"transforms": "dropPassword",
"transforms.dropPassword.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropPassword.blacklist": "password,credit_card"

// 3. 字段重命名
"transforms": "renameFields",
"transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameFields.renames": "user_id:userId,created_at:createdAt"

// 4. 路由到不同 Topic(按字段值)
"transforms": "routeByRegion",
"transforms.routeByRegion.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByRegion.regex": "(.*)",
"transforms.routeByRegion.replacement": "$1-cn"

实战:MySQL → Kafka → Elasticsearch 全链路

完整数据管道 MySQL(binlog) ──Debezium──▶ mysql-prod.orders Topic ↓ (SMT unwrap: 提取新值, 去除 CDC 元数据) { id, user_id, amount, status, created_at } Elasticsearch Sink ──▶ orders 索引(支持全文搜索、聚合分析) S3 Sink ──▶ s3://kafka-archive/orders/year=2024/month=04/ (Parquet 格式)

Debezium 需要的 MySQL 配置:需要开启 binlog(log_bin=ON,格式 binlog_format=ROW),并为 Debezium 用户授予 REPLICATION SLAVE, REPLICATION CLIENT, SELECT 权限。建议使用专用的 Debezium 用户,不使用 root。