1

我想做的是按两个字段("remote-client-ip", "request-params")对流进行分组,并计算每个组中元组的数量。并将它们组合成一张地图。这是我的拓扑:

topology.newStream("kafka-spout-stream-1", repeatSpout)
                    .each(new Fields("str"), new URLParser(), new   Fields(fieldNames))
                    .each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
                    .groupBy(new Fields("remote-client-ip", "query-string"))
                    .aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
                    .groupBy(new Fields("remote-client-ip"))
                    .persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));

但是调试后发现数据流一开始就被阻塞了groupBy(),是多字段分组。Count()在随后的聚合语句中,我没有执行任何操作。

所以我想我误解了关于多字段分组和聚合之间交互的一些概念。

请让我知道我的猜测是对还是错。谢谢!

4

1 回答 1

1

Aggregate()您正在使用拓扑中的函数对已分组的字段进行分组。尝试这个:

.aggregate(new Count(), new Fields("user-word-count"))

而不是这个:

.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
于 2015-04-17T16:15:56.773 回答