1

我有一个输入流,我用它来创建一个 ktable。然后我使用 toStream() 方法使用 ktable 更改日志创建一个输出流。问题是 toStream() 方法创建的流不包含来自已更新我的 KTable 的输入流的所有消息。这是我的代码:

final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
      aggregateKtableMethod,
      storageConf);

KStream<String, event> outputStream = KTable.toStream();

我想为 inputStream 中的每条消息在 outputStream 中获取一条消息。对于大多数消息,它运行良好,但在特定情况下我会丢失一些事件:如果我在很短的时间间隔内(少于 5 秒)收到 2 条具有相同密钥的消息。在这种情况下,我只收到 outputStream 中的第二个事件。

我认为这是因为 Ktable 更新是通过一些批处理操作进行的,但我找不到任何与之相关的配置或文档。是这些丢失事件的原因吗?您知道如何更改配置以便我不会丢失任何消息吗?

4

1 回答 1

0

我找到了解决方案。问题出在我用来创建 ktable 的“storageConf”中,缓存是可以的。我只需要使用以下功能禁用它:

storageConf.withCachingDisabled();

final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
  aggregateKtableMethod,
  storageConf);

现在我的所有事件都在输出流中。

于 2021-08-02T14:50:29.837 回答