Debezium产生的消息结构确实与 JDBC sink 所期望的不同。JDBC 接收器期望消息中的每个字段对应于行中的一个字段,因此消息对应于行的“之后”状态。OTOH,Debezium MySQL 连接器执行更改数据捕获,这意味着它不仅仅包含行的最新状态。具体来说,连接器输出消息,其键包含行的主键或唯一键列,消息值包含信封结构,其中:
- 操作,例如是插入、更新还是删除
- 更改发生前行的状态(插入时为空)
- 更改发生后行的状态(删除时为空)
- 特定于源的信息,包括服务器元数据、事务 ID、数据库和表名称、事件发生时的服务器时间戳以及有关事件在何处找到的详细信息等。
- 连接器生成事件的时间戳
解决这种差异的最简单方法是使用 Kafka 0.10.2.x(当前最新版本是 0.10.2.1)和 Kafka Connect 的新Single Message Transforms (SMTs)。每个 Kafka Connect 连接器都可以配置零个或多个 SMT 链,这些 SMT 链可以在将消息写入 Kafka 之前转换源连接器的输出,或者在将消息作为输入传递到接收器连接器之前转换从 Kafka 读取的消息。SMT 故意非常简单,处理单个消息,绝对不应该访问外部资源或维护任何状态,因此不能替代Kafka Streams或其他更强大的流处理系统,可以连接多个输入流,并且可以执行非常复杂的操作并跨多个消息维护状态。
如果您使用 Kafka Streams 进行任何类型的处理,那么您应该考虑在您的 Kafka Streams 应用程序中操作消息结构。如果没有,那么 SMT 是解决问题的好方法。实际上,有两种方法可以使用 SMT 来调整消息结构。
第一种选择是使用带有 Debezium 连接器的 SMT 来提取/保留行的“之后”状态,并在将其写入 Kafka 之前丢弃所有其他信息。当然,您将在 Kafka 主题中存储更少的信息,并丢弃一些未来可能有价值的 CDC 信息。
第二个也是 IMO 首选的选项是让源连接器保持原样并将所有 CDC 消息保留在 Kafka 主题中,然后使用带有接收器连接器的 SMT 来提取/保留行的“之后”状态并在消息传递到 JDBC 接收器连接器之前丢弃所有其他信息。您可能可以使用 Kafka Connect 中包含的现有 SMT 之一,但您可以考虑编写自己的 SMT 来完全按照您的意愿行事。