0

我在我的应用程序中实现了一个 KTable 和一个 KStream 连接,并在以下情况下期望输出消息;

  1. KStream 中有一条新消息,并且 KTable 中有一条匹配记录
  2. KStream 中有一条更新的消息,并且 KTable 中有一条匹配的记录
  3. 当 KTable 记录有更新时

我一直在观察我的应用程序按预期执行 1 和 2,但不是 3。

有什么建议可以帮助我实现第 3 点吗?

谢谢!

4

1 回答 1

0

您正在寻找一个表到表连接。当新记录到达任一主题时,这会触发输出消息。

首先确保要跨主题连接的记录具有相同的键并且是共同分区的。然后将这两个主题作为 KTables 阅读。

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

最后,在两个表上执行连接

KTable<String, String> joined = left.join(right, (x, y) -> "left=" + x + ", right=" + y);

https://docs.confluent.io/platform/current/streams/javadocs/javadoc/org/apache/kafka/streams/kstream/KTable.html#join-org.apache.kafka.streams.kstream.KTable-org。 apache.kafka.streams.kstream.ValueJoiner-

于 2021-03-29T17:29:44.213 回答