我有一个输入流,我用它来创建一个 ktable。然后我使用 toStream() 方法使用 ktable 更改日志创建一个输出流。问题是 toStream() 方法创建的流不包含来自已更新我的 KTable 的输入流的所有消息。这是我的代码:
final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
aggregateKtableMethod,
storageConf);
KStream<String, event> outputStream = KTable.toStream();
我想为 inputStream 中的每条消息在 outputStream 中获取一条消息。对于大多数消息,它运行良好,但在特定情况下我会丢失一些事件:如果我在很短的时间间隔内(少于 5 秒)收到 2 条具有相同密钥的消息。在这种情况下,我只收到 outputStream 中的第二个事件。
我认为这是因为 Ktable 更新是通过一些批处理操作进行的,但我找不到任何与之相关的配置或文档。是这些丢失事件的原因吗?您知道如何更改配置以便我不会丢失任何消息吗?