0

我对 Kafka 连接很陌生

我将来自多个来源的记录插入到一​​个表中。在某些情况下,某些记录可能会先于其他记录到达。由于我无法控制哪个源将首先提取哪个记录,因此我想对记录的时间戳键添加一个检查。

我的模式中有一个名为“LastModified_timestamp”的键,用于存储我的记录的最新状态的时间戳。

我想向我的 JDBC 接收器连接器添加一个检查,在该连接器中我可以根据比较 LastModified_timestamp 的值来更新记录

我想忽略时间戳较旧的记录,只想更新/插入最新的记录。我找不到任何配置来实现这一点

有什么方法可以实现这一目标吗?在这种情况下编写自定义查询会有所帮助吗?

4

1 回答 1

0

JDBC Sink 连接器不支持这种功能。您有两种选择:

  • 单消息转换 (SMT) - 这些在通过 Kafka Connect 时将逻辑应用于记录。SMT 非常适合删除列、更改数据类型等。不适用于更复杂的处理和逻辑,包括需要像您在此处那样跨越多个记录的逻辑

  • 首先处理源 Kafka 主题中的数据,以应用必要的逻辑。您可以使用 Kafka Streams、KSQL 和其他几个流处理框架(例如 Spark、Flink 等)来做到这一点。如果记录比已经处理的更旧,您需要某种有状态的逻辑来解决。

您能否详细描述一下您的上游数据来源?可能有更好的方法来编排传入的数据以强制执行排序。

最后一个想法是将所有记录放到目标数据库中,然后使用数据库查询中的逻辑使用它来选择给定键的最新(基于LastModified_timestamp)记录。

免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。

于 2018-10-10T08:34:33.483 回答