我正在对窗口流执行聚合并希望抑制早期聚合结果。早期结果是指在窗口结束之前计算的结果,而不是在宽限期内发生的结果。因此,我想抑制所有带有时间戳<窗口结束的聚合结果,但转发所有时间戳> =窗口结束和时间戳<窗口关闭的记录。
最小的 Kafka Streams 拓扑示例:
new StreamsBuilder()
.stream("my-topic")
.windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
.reduce(myReducer)
.suppress( /* searched for*/ )
.toStream();
因此,Suppressed.untilWindowCloses( .. )这对我来说不是一个选择,因为我必须等到宽限期到期,这可能很长。
根据KIP-328,可以使用 as 获得所需的行为Suppressed.untilTimeLimit(Duration.ZERO, .. )(引自 KIP 的描述):
一个。发射前等待更多更新的时间。这是一个时间量,从事件时间(对于常规 KTables)或从窗口结束(对于窗口化 KTables)测量,在将每个键发送到下游之前缓冲每个键。
然而, Kafka Streams JavaDoc以及相应的实现意味着情况并非如此,并且时间限制在接收每个(窗口)键的第一条记录时开始倒计时,而不是在窗口结束时开始倒计时。
我很高兴对此进行澄清并支持如何实现所需的行为。