1

我收到“事件已删除,晚了 5051 毫秒。”

我应该如何构建处理所有事件的管道,无论它们是否迟到。

我尝试了几种方法。基本上,我尝试的是

  1. 如果没有窗口,我没有收到迟到的事件,但这不适用于我,因为并行执行和接收器中的值被覆盖而不是合并。
  2. 因此我使用了窗口,它解决了我最重要的问题,但导致了迟到的事件。
  3. 接下来,我尝试使用不带时间戳的窗口,这引发了必须定义时间戳的异常。

基本上我在这里有两个问题:1)如何将新事件合并到接收器中的现有事件 2)而不丢弃事件或覆盖。

代码:

WindowDefinition customWindow = WindowDefinition.sliding(60000, 30000);
customWindow.setEarlyResultsPeriod(1000);

StreamStage<Map.Entry<...>> updatedState = p
                .drawFrom(<source>)
                .withIngestionTimestamps()
                .groupingKey(...)
                .window(customWindow)
                .aggregate(AggregateOperations.toCollection(ArrayList::new))
                .mapUsingIMap(...)
                .sink(...)
4

0 回答 0