KStream<Integer, Integer> stream;
KGroupedStream<Integer, Integer> grouped = stream.groupByKey();
KTable<Integer, Integer> aggregated = grouped.aggregate(
() -> 0,
(k, i, agg) -> {
if (agg == null)
agg = 0;
Integer sum = agg + i;
return sum > 100 ? null : sum;
});
我的信息流上的消息是:
- (1, 50)
- (1, 75)
- (1, 50)
当第二条消息到达时,聚合器返回 null。是否KTable aggregated接收 (1, null) 并删除 key=1 的状态?
message#3 到达时为aggnull 还是再次调用 Initializer 并设置agg为 0?
如果我使用reduce而不是聚合,如果Reducer返回null,下一条消息将通过Reducer还是会像组中的第一条消息一样“按原样”使用?
谢谢,大卫