您很可能已经阅读过源代码:
class OrderedRDDFunctions {
// <snip>
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
val part = new RangePartitioner(numPartitions, self, ascending)
val shuffled = new ShuffledRDD[K, V, P](self, part)
shuffled.mapPartitions(iter => {
val buf = iter.toArray
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator
} else {
buf.sortWith((x, y) => x._1 > y._1).iterator
}
}, preservesPartitioning = true)
}
而且,正如您所说,整个数据必须经过洗牌阶段 - 如片段所示。
但是,您对随后调用 take(K) 的担忧可能并不那么准确。此操作不会循环遍历所有 N 项:
/**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*/
def take(num: Int): Array[T] = {
那么,看起来:
O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (至少对于小 K)<< O(myRdd.sortByKey().collect ()