Kafka Connect Source 与 Sink 原理
# Kafka Connect Source 与 Sink 原理
# 一、架构概述
Kafka Connect 是 Kafka 生态中专门解决数据集成问题的框架。它的定位很清晰:把外部系统的数据搬到 Kafka(Source),或者把 Kafka 的数据搬到外部系统(Sink)。在此之前,这两个方向都需要手写 Producer/Consumer 代码来处理连接管理、容错、并行、偏移提交等通用问题,Connect 把这些能力抽象成了配置驱动的标准化组件。
Connect 的架构围绕三个核心概念展开:
Kafka Connect Worker
├── Connector A (Source) ← 定义"从哪读"
│ ├── Task 0 ← 实际拉取数据
│ └── Task 1
├── Connector B (Sink) ← 定义"写到哪"
│ ├── Task 0 ← 实际写入数据
│ └── Task 1
└── Connector C (Source)
└── Task 0
2
3
4
5
6
7
8
9
Connector(连接器) 负责定义数据从哪来、到哪去。它是逻辑层面的抽象,负责两件事:管理 Task 的生命周期,以及决定如何把工作拆分成多个并行单元。例如一个 MySQL Source Connector 知道要连接哪台 MySQL、读取哪些表,但它本身不读取数据。
Task(任务) 是真正执行数据搬运的单元。一个 Connector 可以拆分成多个 Task,每个 Task 独立拉取或写入自己负责的那部分数据。Task 的数量决定了并行度,也决定了集群能利用多少个 Worker 节点。
Worker(工作节点) 是运行 Task 的进程。Worker 负责接收通过 REST API 提交的 Connector 配置、把 Task 调度到具体节点上、管理 Offset 的持久化存储。Worker 有两种运行模式:Standalone(单进程手动管理)和 Distributed(多节点集群自动故障转移),生产环境必须用后者。
一次完整的数据流转过程:
Source System (MySQL/PostgreSQL/MongoDB...)
│
▼
SourceTask.poll() ← 拉取数据,返回 List<SourceRecord>
│
▼
SMT 管道 ← 字段变换、topic 路由、过滤
│
▼
Converter ← 编码为 byte[](JSON/Avro/Protobuf)
│
▼
Kafka Topic ← 写入 Kafka
│
▼
SinkTask.put() / flush() ← 从 Kafka 消费,批量写入目标
│
▼
Target System (ES/MySQL/S3...)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Source 端,Connector 初始化后创建 SourceTask 实例,每个 Task 进入 poll 循环持续从源端拉取数据;拉到的结构化数据经过 SMT 管道做字段清洗、路由等处理;然后通过 Converter 序列化为字节写入 Kafka Topic。Sink 端则反向:从 Kafka 消费消息,经过 Converter 反序列化和 SMT 处理后,通过 SinkTask 的 put/flush 机制写入目标系统。
# 二、Source Connector:Poll 循环与 Offset 管理
# poll 是怎么工作的
Source Connector 的本质是拉模式 —— 它不是被动接收事件,而是主动向数据源索取数据。驱动整个流程的是 SourceTask 的 poll() 方法,Connect 框架在一个循环里反复调用它:
每次 poll 返回一批 SourceRecord,框架先让这批 record 穿过 SMT 管道做变换,再调用 Converter 序列化为字节,最后写入 Kafka。写入成功后,框架会提交这批 record 的 offset,然后立即开始下一轮 poll。如果某次 poll 返回了空列表,框架会按照 poll.interval.ms 等待后重试,而不是认为数据已经读完(因为 Source 的数据流在概念上是无限的)。
这里有一个重要的设计:只有在 offset 成功提交之后,poll 才会继续。如果提交失败,说明 Kafka 没有确认这批数据,重启后框架会从上次成功提交的位置重新 poll。这正是 at-least-once 语义的由来 —— 一批数据可能被写入 Kafka 两次,但绝不会漏掉。
# Offset 管理的是源端位置
和普通 Kafka Consumer 不同,Source Connector 的 offset 存储的不是 Kafka 的 consumer offset,而是源端的读取位置。例如对于 Debezium MySQL Connector,offset 记录的是 binlog 文件名和位点(mysql-bin.000003:1258943);对于 JDBC Source Connector,记录的可能是自增主键的值。
Connect 框架用两个概念来描述这个位置:SourcePartition 唯一标识数据源中的一个分区(哪台机器、哪个数据库、哪张表),SourceOffset 记录在这个分区中的读取进度。重启时框架从 offset 存储中读出每个分区的位置,传给 SourceTask 的 start() 方法,Task 就知道从哪里接上。
Offset 默认存储在 Kafka 的内部 topic 里(connect-offsets),这意味着 offset 本身也享受 Kafka 的持久化和复制保障。
# 分区策略决定并行度
Connector 的并行度不是简单的"配置几个 Task 就几并行"。tasks.max 只是上限,实际 Task 数量还要受数据源本身分区数的制约:实际 Task 数 = min(maxTasks, 数据源可分区的数量)。
例如 MySQL 的 binlog 是单一日志流,天然无法拆分,所以 Debezium MySQL Connector 的 Task 数永远是 1。而 MongoDB 按 shard 拆分或者 JDBC Connector 按表拆分时,每个分区可以分配独立的 Task,这时增加 tasks.max 才真正提升并行度。
这个约束意味着 Source Connector 的吞吐天花板往往不在于 Connect 框架本身,而在于数据源的分区设计。
# CDC 的特殊之处
CDC(Change Data Capture)类 Source Connector 是最常见的生产场景,以 Debezium 为典型代表。它的工作分两个阶段:
MySQL Binlog
│
▼
Debezium Connector
│
├── Phase 1: Snapshot(首次全量)
│ ├── SELECT * FROM table 分页扫描
│ └── 记录 snapshot 完成时的 binlog 位点
│
└── Phase 2: Streaming(增量持续)
├── 从 snapshot 位点开始消费 binlog
└── 实时推送变更事件
2
3
4
5
6
7
8
9
10
11
12
CDC 产生的数据有一个 Envelope 结构,每条消息包含 before(变更前值)、after(变更后值)、op(操作类型:c/u/d)以及 source 元数据。这个 Envelope 是后面所有数据处理流的起点 —— 下游通常只需要 after 里的数据,析构 Envelope 是 SMT 最典型的应用场景。
# 三、Sink Connector:put/flush 与投递保证
Sink 端职责是把 Kafka 的数据写入外部系统,核心是一对方法:put() 和 flush()。两者的分工很明确:put 负责接收数据(通常只是缓冲到内存),flush 负责真正把数据落地到目标系统。
框架每次从 Kafka 消费一批消息后调用一次 put,put 把这些 SinkRecord 加入内部 buffer。框架可以在调用多次 put 之后才调用一次 flush —— 具体频率取决于 offset.flush.interval.ms 等配置。flush 返回成功后,框架才提交 Kafka consumer offset。
这个时序是关键:如果 flush 成功但 offset 提交失败,或者 flush 失败抛异常,offset 都不会推进。重启后从上次成功提交的位置重新消费,同一批数据再次写入目标系统。所以 Sink Connector 默认提供 at-least-once 语义,写入可能重复,但不会遗漏。
更高要求的场景可以做到 exactly-once,前提是目标系统支持事务 —— 将目标写入和 Kafka offset 提交放入同一个分布式事务中,两边同时成功或回滚。
# 幂等性是 Sink 端的必备设计
既然 at-least-once 允许重复写入,Sink 端就必须处理幂等。最直接的方式是利用 Kafka 消息自带的元数据构建唯一标识:每条消息的 topic + partition + offset 天然唯一,用它作为目标系统的 upsert key,重复写入就变成了无副作用的覆盖操作。
举个例子,写入 Elasticsearch 时用 topic-partition-offset 作为 document id,用 upsert 而非 create,同一消息无论写入几次结果都一样。
# 执行模型的选择
大部分 Sink Connector 的实现都是一个"先 put 缓冲、再 flush 批量提交"的模板。put 阶段做轻量操作(校验、转换、入队),flush 阶段做重量操作(网络 IO、事务提交)。这种设计既保证了吞吐(批量提交减少网络往返),又利用 flush 返回与否来控制 offset 推进的边界,是 Sink 端一致性的基石。
# 四、SMT 与 Converter:数据变换的两层模型
Kafka Connect 的数据处理管道中有两个容易混淆的环节:SMT(Single Message Transform)和 Converter。它们都在 Source/Sink 的路径上工作,但职责完全不同,混淆它们会导致架构设计跑偏。
# 职责边界
SMT 工作在业务语义层,处理的是结构化数据的内容和结构 —— 字段改名、结构调整、topic 路由、数据过滤,都是 SMT 的范畴。SMT 面对的是 Connect 内部的 SourceRecord/SinkRecord,看得见字段名和字段值。
Converter 工作在传输表示层,负责 Struct 和 byte[] 之间的转换 —— 数据在 Kafka 里存成什么格式(JSON/Avro/Protobuf)、有没有 schema、schema 怎么演进,都是 Converter 的事。Converter 不关心字段的业务含义,只关心编码和协议。
用一个简单的判断标准:如果操作的是"数据长什么样",那是 SMT;如果操作的是"数据怎么编码",那是 Converter。
# SMT 的七种典型能力
字段级处理是最日常的使用场景。ReplaceField 可以删除隐私字段(脱敏场景)、重命名字段(适配下游模型)、或者做白名单/黑名单过滤(减少 Kafka payload)。例如剔除 password、ssn 等敏感列,把几十列的表精简到下游真正需要的几列。
结构转换在 Debezium 场景下几乎必用。Debezium 产出的数据是带 before/after/source 的 Envelope,下游消费方通常只需要 after 里的纯业务数据。ExtractNewRecordState 这个 SMT 专职做展平:把嵌套的 Envelope 拆成扁平的字段集合。
Topic 路由通过 RegexRouter 实现动态分流:根据 topic 名称匹配规则,把数据写入不同的目标 topic。多环境隔离(同一个 Connector 配置分别写入 dev/prod topic)、多租户拆分都靠它。
增强字段用 InsertField 在数据流中附加元信息。常见的是插入数据来源标识(data_source=mysql-prod)、处理时间戳、或 trace_id。这类字段对下游做数据血缘追踪和链路排查很有用。
Key 重写用 ValueToKey 把 value 中的部分字段提升为 Kafka message key,从而影响分区策略。比如把 user_id 设为 key,同一个用户的所有变更就会落入同一个分区,保证消费者侧的有序处理。
条件过滤按规则丢弃不需要的数据:只保留部分表的变更、过滤掉 delete 事件、或者跳过不满足条件的记录。注意过滤发生在写入 Kafka 之前,被丢弃的数据不会占用 Kafka 存储。
自定义 SMT 是灵活性最高的选择。实现 Transformation 接口后可以在 apply 方法里做任何事 —— JSON 拆解、多字段拼接计算、数据规范化,甚至可以 return null 来丢弃整条消息。但要注意 SMT 是同步链路的一部分,复杂逻辑会直接拖慢吞吐,重的 ETL 应该放到下游流处理框架而不是 SMT 里。
# Converter 的六种能力
格式选择是 Converter 最基本的职责。JSON 最简单直观,适合调试和低频场景;Avro 自带 schema 且体积小 30%-50%,是生产环境的主流选择;Protobuf 体积最小,适合极致压缩的高吞吐链路。
Schema 管理配合 Confluent Schema Registry 使用,解决了"共享 topic 的消费方如何知道数据格式"这个核心问题。Schema Registry 集中管理 Avro/Protobuf 的 schema 版本,支持向前/向后/全兼容的演进策略,是多团队协作时的数据契约层。
压缩与体积优化是高吞吐场景的核心收益。Avro 和 Protobuf 比 JSON 显著更小,节省的不只是存储空间,更是网络带宽和 broker 的 IO 压力。
类型系统约束是 Avro/Protobuf 相对 JSON 的重要优势:schema 在写入时就校验字段类型,脏数据在进入 Kafka 之前就被拦截,而不是等到下游消费时才发现。
跨语言兼容让不同技术栈的消费方都能正确解析数据。Java 生产者用 Avro 序列化,Go 和 Python 消费者同样可以无歧义地反序列化。
自定义 Converter 用于特殊场景:加密序列化(安全合规)、私有二进制协议(存量系统对接)、带压缩的 Converter(进一步减少体积)。
# 能力边界速查
有一类操作经常被问"该用 SMT 还是 Converter 做":
- 改字段值、改字段结构、改 topic、加字段、控制分区 key —— 这些都是 SMT 的活
- 改数据格式(JSON/Avro)、管理 schema、压缩/编码 —— 这些都是 Converter 的活
记住两条红线:SMT 不能变重逻辑,ETL 放 Connect 外面做;Converter 不能承载业务语义,它不知道自己编码的那些字段叫什么、代表什么。
# 生产中的组合使用
一个典型的 Debezium 生产配置会把两者串联起来:
SMT 侧做 unwrap(展平 Envelope)+ route(topic 路由),Converter 侧用 Avro + Schema Registry。实际的执行顺序是:binlog 数据先经过 SMT 变成干净的业务字段,再经过 Converter 编码为 Avro 字节,最后写入 Kafka。SMT 处理数据内容,Converter 处理数据格式,两层各司其职。
# 五、Worker 模型与集群机制
Distributed 模式是唯一适合生产使用的 Worker 模式。多个 Worker 节点组成集群,通过 Kafka 自身的几个内部 topic 完成协调:
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ │ │ │ │ │
│ Connector-A │ │ Connector-B │ │ Connector-C │
│ ├─ Task-0 │ │ ├─ Task-0 │ │ ├─ Task-0 │
│ └─ Task-1 │ │ └─ Task-1 │ │ └─ Task-1 │
└──────────┬───────────┘ └──────────┬───────────┘ └──────────┬───────────┘
│ │ │
└─────────────────────────┼─────────────────────────┘
│
┌───────────┴───────────┐
│ Kafka 内部 Topic │
│ │
│ config topic │ ← Connector 配置
│ status topic │ ← Task 运行时状态
│ offset topic │ ← 持久化 Offset
└───────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
config topic 存储所有 Connector 的配置(相当于集群的期望状态),status topic 存储每个 Task 的运行时状态,offset topic 持久化所有 Connector 的读取进度。
当新 Worker 加入或某个 Worker 宕机时,集群自动触发 rebalance:Task 在线存活的 Worker 之间重新分配。rebalance 是优雅停机的 —— 旧 Task 先完成 flush 后 stop,新 Task 再从 offset 恢复 start。整个过程在 at-least-once 语义范围内保证数据不丢。
集群的日常操作通过 REST API 进行:创建 Connector、查看状态、暂停恢复、重启,全部通过 HTTP 接口完成,不需要登录 Worker 机器修改配置文件。这也是 Connect 能被编排进自动化数据管道的重要原因。
# 六、错误处理与容错
Connect 的错误处理由 errors.tolerance 控制。默认值是 none,任何一条消息处理失败都导致 Task 停止。改为 all 则所有错误被忽略继续处理,但这意味着静默丢数据,非常危险。
生产环境的标准做法是配合 Dead Letter Queue:errors.tolerance=all 的同时配置 errors.deadletterqueue.topic.name,失败的消息被写入一个专门的 DLQ topic,保留原始数据、失败原因、来源信息。事后可以从 DLQ 中消费这些消息做人工补偿或重放,既不影响主链路运行,也不会丢数据。
不同环节的异常表现不同:Source poll 抛异常时 Task 自动重试(可配置重试次数),通常是源端网络问题需要排查;Sink put/flush 抛异常导致 Task 失败且 offset 回滚,往往是目标系统负载过高需要扩容;Converter 抛异常意味着 schema 不兼容或数据格式错误;SMT 异常可以根据 errors.tolerance 决定丢弃或停 Task。
# 七、典型配置参考
# Debezium MySQL Source
一个标准的 MySQL CDC → Kafka 配置:
name=mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
# MySQL 连接
database.hostname=mysql
database.port=3306
database.user=debezium
database.password=debezium
database.server.id=184054
database.server.name=my_mysql
# 表白名单
table.include.list=mydb.users,mydb.orders
# SMT
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
# Converter
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter.schema.registry.url=http://schema-registry:8081
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Debezium 的 tasks.max 固定为 1(binlog 单流),SMT 使用 ExtractNewRecordState 展平 Envelope,Converter 使用 Avro 配合 Schema Registry 做 schema 治理。
# Kafka → Elasticsearch Sink
name=es-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=2
connection.url=http://es:9200
# Topic → Index 映射
topics=mysql.users,mysql.orders
topic.index.map=mysql.users:users_index,mysql.orders:orders_index
# 幂等写入
key.ignore=false
write.method=upsert
# 死信队列
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq.es-sink
# Converter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
这里 write.method=upsert 配合 key.ignore=false 实现了幂等写入 —— 用 Kafka message key 作为 ES document id,重复写入执行 upsert 而非 insert。同时配置了 DLQ 兜底,防止个别异常消息阻塞整条链路。
# 八、工程场景映射
实时数仓:Source 层用 Debezium 捕获 MySQL 变更,SMT 做 Envelope 展平和字段清洗,Converter 用 Avro + Schema Registry 统一 schema 治理,Sink 层通过 JDBC Sink 或自定义 Connector 写入 ClickHouse/TiDB。这是最完整的全链路 CDC 范式。
多机房同步:SMT 侧用 RegexRouter 做 topic 按机房重写(如 topic → topic.dc1),Converter 用 JSON 降低跨机房协调成本(不需要 Schema Registry 跨机房部署)。
数据治理平台:SMT 侧用自定义脱敏逻辑(手机号、身份证 mask),Converter 用 Avro + Schema Registry 强制 schema 校验,让数据契约不可绕过。
高吞吐链路:SMT 做字段精简(ReplaceField 只保留必要列),Converter 改用 Protobuf 追求最小网络开销。CPU 成本换带宽成本的典型 trade-off。
# 九、总结
- Source Connector 的核心是 poll 循环 + 源端 offset 管理。理解 offset 提交时机是理解 at-least-once 的关键。
- Sink Connector 的核心是 put/flush 协作 + offset 提交边界。flush 成功才提交 offset,这个时序是 Sink 端一致性的基石。幂等写入是 Sink 端的标配设计。
- SMT 做数据内容和结构的变换(字段、topic、过滤),Converter 做数据编码和协议(JSON/Avro/Protobuf、schema)。复杂逻辑放 SMT 但别让它变重,复杂协议放 Converter 但别让它承载业务语义。
- 生产环境必须用 Distributed 模式 + REST API 管理,配合 DLQ 兜底,避免静默丢数据。