我收到“事件已删除,晚了 5051 毫秒。”
我应该如何构建处理所有事件的管道,无论它们是否迟到。
我尝试了几种方法。基本上,我尝试的是
- 如果没有窗口,我没有收到迟到的事件,但这不适用于我,因为并行执行和接收器中的值被覆盖而不是合并。
- 因此我使用了窗口,它解决了我最重要的问题,但导致了迟到的事件。
- 接下来,我尝试使用不带时间戳的窗口,这引发了必须定义时间戳的异常。
基本上我在这里有两个问题:1)如何将新事件合并到接收器中的现有事件 2)而不丢弃事件或覆盖。
代码:
WindowDefinition customWindow = WindowDefinition.sliding(60000, 30000);
customWindow.setEarlyResultsPeriod(1000);
StreamStage<Map.Entry<...>> updatedState = p
.drawFrom(<source>)
.withIngestionTimestamps()
.groupingKey(...)
.window(customWindow)
.aggregate(AggregateOperations.toCollection(ArrayList::new))
.mapUsingIMap(...)
.sink(...)