I want to batch messages with KStream interface.
I have a Stream with Keys/values I tried to collect them in a tumbling window and then I wanted to process the complete window at once.
builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
.aggregateByKey(
HashMap::new,
(aggKey, value, aggregate) -> {
aggregate.put(value.getUuid, value);
return aggregate;
},
TimeWindows.of("intentWindow", 100),
longSerde, mapSerde)
.foreach((wk, values) -> {
The thing is foreach gets called on each update to the KTable. I would like to process the whole window once it is complete. As in collect Data from 100 ms and then process at once. In for each.
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6
at some point the new window starts with 1 entry in the map. So I don't even know when the window is full.
any hints to to batch process in kafka streams