我们正在使用windowedBy
和aggregate
操作KStream
来生成一个KTable
. 然后我们将其转换KTable
为KStream
usingtoStream
方法 on KTable
。我们在这个新KStream
版本中所期待的是对KTable
.
// MyAggregation is our class that aggregates incoming message according to some business logic
// stream is setup to read from a incoming topic
KTable<Windowed<String>, MyAggregation> table = stream
.windowedBy(TimeWindows.of(Duration.ofMillis(60000).grace(1000))
.aggregate(MyAggregation::new, (key, message, aggregation) -> {
aggregation.aggregateMessage(message);
return aggregation;
}, Materialized.<String, MyAggregation, WindowStore<Bytes, byte[]>>as(
"aggregation")
.withValueSerde(myAggregationSerdes));
return table.toStream().map((key, value) -> {
// Remove start/end time from key as this stream is published to a different topic
// that needs to consume by key
System.out.println("Aggregation done for:" + key);
return new KeyValue<>(key.key(), value);
});
然而,我们注意到该Aggregation done for
语句只在窗口期间打印一次,即使我们在给定窗口中针对同一个键输入了多条消息。请注意,我们没有使用suppress
运算符仅发布最后一次更新。有人可以帮我理解为什么我们没有看到每个更新都发布到结果KStream
吗?是否有任何最小时间间隔(或缓冲区大小)来缓冲更新然后发布?我们注意到的另一件事是,如果窗口更大(比如 5 分钟或更长时间),那么我们确实会看到中间更新正在发布。然而,这些并不是每一个更新。任何帮助/指针都非常感谢。
谢谢!