34

阅读 Spark 方法 sortByKey :

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

是否可以只返回“N”个结果。所以不是返回所有结果,而是返回前 10 个。我可以将排序的集合转换为数组并使用take方法,但由于这是一个 O(N) 操作,有没有更有效的方法?

4

3 回答 3

51

如果您只需要前 10 名,请使用rdd.top(10). 它避免了排序,因此速度更快。

rdd.top并行处理数据,收集堆中每个分区中的前 N ​​个,然后合并这些堆。这一个O(rdd.count)操作。排序将是O(rdd.count log rdd.count),并且会导致大量数据传输——它会进行洗牌,因此所有数据都将通过网络传输。

于 2014-06-14T00:20:04.373 回答
19

您很可能已经阅读过源代码:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }

而且,正如您所说,整个数据必须经过洗牌阶段 - 如片段所示。

但是,您对随后调用 take(K) 的担忧可能并不那么准确。此操作不会循环遍历所有 N 项:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

那么,看起来:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (至少对于小 K)<< O(myRdd.sortByKey().collect ()

于 2014-05-24T07:23:33.557 回答
8

至少从 PySpark 1.2.0 开始,另一种选择是使用takeOrdered

按升序排列:

rdd.takeOrdered(10)

按降序排列:

rdd.takeOrdered(10, lambda x: -x)

k,v 对的前 k 个值:

rdd.takeOrdered(10, lambda (k, v): -v)
于 2015-06-19T17:12:54.060 回答