1

I have problem of capturing data in mysql with debezium change data capture and consuming it to another mysql using kafka connect jdbc sink.

Because the schema and payload that debezium produces to kafka topic is not compatible with the schema that kafka connect jdbc sink expects.

I get exception when jdbc sink wants to consume data and create records in another mysql.

How should I solve this problem ?

4

1 回答 1

7

Debezium产生的消息结构确实与 JDBC sink 所期望的不同。JDBC 接收器期望消息中的每个字段对应于行中的一个字段,因此消息对应于行的“之后”状态。OTOH,Debezium MySQL 连接器执行更改数据捕获,这意味着它不仅仅包含行的最新状态。具体来说,连接器输出消息,其键包含行的主键或唯一键列,消息值包含信封结构,其中:

  • 操作,例如是插入、更新还是删除
  • 更改发生前行的状态(插入时为空)
  • 更改发生后行的状态(删除时为空)
  • 特定于源的信息,包括服务器元数据、事务 ID、数据库和表名称、事件发生时的服务器时间戳以及有关事件在何处找到的详细信息等。
  • 连接器生成事件的时间戳

解决这种差异的最简单方法是使用 Kafka 0.10.2.x(当前最新版本是 0.10.2.1)和 Kafka Connect 的新Single Message Transforms (SMTs)。每个 Kafka Connect 连接器都可以配置零个或多个 SMT 链,这些 SMT 链可以在将消息写入 Kafka 之前转换源连接器的输出,或者在将消息作为输入传递到接收器连接器之前转换从 Kafka 读取的消息。SMT 故意非常简单,处理单个消息,绝对不应该访问外部资源或维护任何状态,因此不能替代Kafka Streams或其他更强大的流处理系统,可以连接多个输入流,并且可以执行非常复杂的操作并跨多个消息维护状态。

如果您使用 Kafka Streams 进行任何类型的处理,那么您应该考虑在您的 Kafka Streams 应用程序中操作消息结构。如果没有,那么 SMT 是解决问题的好方法。实际上,有两种方法可以使用 SMT 来调整消息结构。

第一种选择是使用带有 Debezium 连接器的 SMT 来提取/保留行的“之后”状态,并在将其写入 Kafka 之前丢弃所有其他信息。当然,您将在 Kafka 主题中存储更少的信息,并丢弃一些未来可能有价值的 CDC 信息。

第二个也是 IMO 首选的选项是让源连接器保持原样并将所有 CDC 消息保留在 Kafka 主题中,然后使用带有接收器连接器的 SMT 来提取/保留行的“之后”状态并在消息传递到 JDBC 接收器连接器之前丢弃所有其他信息。您可能可以使用 Kafka Connect 中包含的现有 SMT 之一,但您可以考虑编写自己的 SMT 来完全按照您的意愿行事。

于 2017-05-15T14:17:50.580 回答