0

我们正在使用windowedByaggregate操作KStream来生成一个KTable. 然后我们将其转换KTableKStreamusingtoStream方法 on KTable。我们在这个新KStream版本中所期待的是对KTable.

// MyAggregation is our class that aggregates incoming message according to some business logic
// stream is setup to read from a incoming topic
KTable<Windowed<String>, MyAggregation> table = stream
    .windowedBy(TimeWindows.of(Duration.ofMillis(60000).grace(1000))
    .aggregate(MyAggregation::new, (key, message, aggregation) -> {          
      aggregation.aggregateMessage(message);
      return aggregation;
    }, Materialized.<String, MyAggregation, WindowStore<Bytes, byte[]>>as(
        "aggregation")
        .withValueSerde(myAggregationSerdes));

return table.toStream().map((key, value) -> {
  // Remove start/end time from key as this stream is published to a different topic
  // that needs to consume by key
  System.out.println("Aggregation done for:" + key);
  return new KeyValue<>(key.key(), value);
});

然而,我们注意到该Aggregation done for语句只在窗口期间打印一次,即使我们在给定窗口中针对同一个键输入了多条消息。请注意,我们没有使用suppress运算符仅发布最后一次更新。有人可以帮我理解为什么我们没有看到每个更新都发布到结果KStream吗?是否有任何最小时间间隔(或缓冲区大小)来缓冲更新然后发布?我们注意到的另一件事是,如果窗口更大(比如 5 分钟或更长时间),那么我们确实会看到中间更新正在发布。然而,这些并不是每一个更新。任何帮助/指针都非常感谢。

谢谢!

4

0 回答 0