与此问题类似但略有不同:KStream 批处理窗口,我想在将消息KStream
推送给消费者之前对来自 a 的消息进行批处理。
但是,此下推不应安排在固定的时间窗口上,而应安排在每个键的固定消息计数阈值上。
首先想到两个问题:
1)这是AbstractProcessor
应该处理的方式吗?类似于以下内容:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2)由于StateStore
可能会爆炸(如果条目值永远不会达到阈值以便被转发),“垃圾收集”的最佳方法是什么?我可以制定一个基于时间的计划并删除太旧的密钥……但这看起来非常 DIY 并且容易出错。