我的流是键/值对,我想将它们作为“原始”和 60 秒聚合保存到数据库中。最初我是这样做的:
->foreach
/
kStreamBuilder.stream->aggregateBy->process
但后来我发现
一个。.aggregateby()
只返回它匹配的对(我需要所有它们 - 匹配或其他)
b。我可以在阶段使用HashMap.process()
来实现相同的聚合效果。然后当.punctuate()
被调用时,我将所有 k/v 对写入数据库。
所以得到的拓扑变成:
kStreamBuilder.stream->foreach
kStreamBuilder.stream->process
问题:
- 这是获得写入所有匹配或其他kv对的结果的“合理”方式吗?(所有值通过foreach和任何对 + 其余通过process)
- 在将原始流发送到之前,我是否需要(以某种方式)划分原始流
.foreach()
,.process()
或者是否足以执行上述操作?