1
KStream<Integer, Integer> stream;
KGroupedStream<Integer, Integer> grouped = stream.groupByKey();
KTable<Integer, Integer> aggregated = grouped.aggregate(
    () -> 0,
    (k, i, agg) -> {
       if (agg == null)
         agg = 0;
       Integer sum = agg + i;
       return sum > 100 ? null : sum;
    });

我的信息流上的消息是:

  1. (1, 50)
  2. (1, 75)
  3. (1, 50)

当第二条消息到达时,聚合器返回 null。是否KTable aggregated接收 (1, null) 并删除 key=1 的状态?

message#3 到达时为aggnull 还是再次调用 Initializer 并设置agg为 0?

如果我使用reduce而不是聚合,如果Reducer返回null,下一条消息将通过Reducer还是会像组中的第一条消息一样“按原样”使用?

谢谢,大卫

4

1 回答 1

3

当第二条消息到达时,聚合器返回 null。KTable 聚合是否接收 (1, null) 并删除其 key=1 的状态?

是的。

message#3 到达时 agg 为 null 还是再次调用 Initializer 并将 agg 设置为 0?

再次调用初始化程序。

如果我使用reduce而不是聚合,如果Reducer返回null,下一条消息将通过Reducer还是会像组中的第一条消息一样“按原样”使用?

减少像聚合一样的工作。因此,如果您返回null,则将处理以下消息,就好像它是第一条消息一样。

元评论:你为什么不直接运行代码并尝试一下???

于 2018-10-08T00:09:48.033 回答