2

我的流是键/值对,我想将它们作为“原始”和 60 秒聚合保存到数据库中。最初我是这样做的:

                       ->foreach
                     /
kStreamBuilder.stream->aggregateBy->process

但后来我发现

一个。.aggregateby()只返回它匹配的对(我需要所有它们 - 匹配或其他)
b。我可以在阶段使用HashMap.process()来实现相同的聚合效果。然后当.punctuate()被调用时,我将所有 k/v 对写入数据库。

所以得到的拓扑变成:

kStreamBuilder.stream->foreach
kStreamBuilder.stream->process

问题:

  1. 这是获得写入所有匹配或其他kv对的结果的“合理”方式吗?(所有值通过foreach和任何对 + 其余通过process
  2. 在将原始流发送到之前,我是否需要(以某种方式)划分原始流.foreach().process()或者是否足以执行上述操作?
4

1 回答 1

3

DSL 层的聚合是为“增量聚合”而设计的,即当前聚合结果加上要“添加”的单个新值。如果要一次访问 60 秒窗口的所有“原始记录”,则需要使用处理器 API。

如果你有两个下游运营商,你不需要做任何事情。记录将自动转发给两者。但是,请记住,它们不会被复制,即,下游操作员将看到每条记录的相同 Java 对象!

于 2016-09-05T12:52:40.403 回答