2

我有一个带有用户请求 URL 的日志流。 每分钟我都想获得一直请求前 100 个页面并将其保存到 HDFS。

我了解如何为每个 url 维护一些请求:

 val ratingItemsStream : DStream[(String,Long)] = lines
      .map(LogEntry(_))
      .map(entry => (entry.url, 1L))
      .reduceByKey(_ + _)
      .updateStateByKey(updateRequestCount)
// this provides a DStream of Tuple of [Url, # of requests]

但是接下来我该怎么办?显然,我需要将所有更新传递给主机以维护一个优先队列,然后每 1 分钟获取它的前 K 个。

我怎样才能做到这一点?

UPD:我已经看到了 spark 示例和 algebird 的 MapMonoid 在那里使用。但是由于我不明白它是如何工作的(奇怪的是没有在网上找到任何信息),我不想使用它。我一定有办法,对吧?

4

1 回答 1

1

您可以通过对数据进行 x 分钟窗口聚合并应用排序来获得排名来接近它。

val window = ratingItemStream.window(Seconds(windowSize), Seconds(windowSize))
window.forEachRDD{rdd =>
    val byScore = rdd.map(_.swap).sortByKey(ascending=false).zipWithIndex
    val top100 = byScore.collect{case ((score, url), index) if (index<100) => (url, score)}
    top100.saveAsTextFile("./path/to/file/")
}

(示例代码,未经测试!)

请注意,这rdd.top(x)将为您提供比排序/压缩更好的性能,但它会返回一个数组,因此,您可以自己使用 hadoop API 将其保存到 hdfs(我认为这是一个选项)

于 2014-11-16T22:20:38.977 回答