我有一个带有用户请求 URL 的日志流。 每分钟我都想获得一直请求的前 100 个页面并将其保存到 HDFS。
我了解如何为每个 url 维护一些请求:
val ratingItemsStream : DStream[(String,Long)] = lines
.map(LogEntry(_))
.map(entry => (entry.url, 1L))
.reduceByKey(_ + _)
.updateStateByKey(updateRequestCount)
// this provides a DStream of Tuple of [Url, # of requests]
但是接下来我该怎么办?显然,我需要将所有更新传递给主机以维护一个优先队列,然后每 1 分钟获取它的前 K 个。
我怎样才能做到这一点?
UPD:我已经看到了 spark 示例和 algebird 的 MapMonoid 在那里使用。但是由于我不明白它是如何工作的(奇怪的是没有在网上找到任何信息),我不想使用它。我一定有办法,对吧?