15

我有一个火花对 RDD (key, count) 如下

Array[(String, Int)] = Array((a,1), (b,2), (c,1), (d,3))

如何使用 spark scala API 找到计数最高的密钥?

编辑:对 RDD 的数据类型是 org.apache.spark.rdd.RDD[(String, Int)]

4

4 回答 4

22

使用Array.maxBy方法:

val a = Array(("a",1), ("b",2), ("c",1), ("d",3))
val maxKey = a.maxBy(_._2)
// maxKey: (String, Int) = (d,3)

RDD.max

val maxKey2 = rdd.max()(new Ordering[Tuple2[String, Int]]() {
  override def compare(x: (String, Int), y: (String, Int)): Int = 
      Ordering[Int].compare(x._2, y._2)
})
于 2014-11-12T11:56:46.330 回答
13

使用takeOrdered(1)(Ordering[Int].reverse.on(_._2))

val a = Array(("a",1), ("b",2), ("c",1), ("d",3))
val rdd = sc.parallelize(a)
val maxKey = rdd.takeOrdered(1)(Ordering[Int].reverse.on(_._2))
// maxKey: Array[(String, Int)] = Array((d,3))

引用RDD.takeOrdered的注释:

仅当预期结果数组很小时才应使用此方法,因为所有数据都加载到驱动程序的内存中。

于 2015-11-12T10:27:50.620 回答
9

对于 Pyspark:

让我们a将键作为字符串和值作为整数的 RDD 对

a.max(lambda x:x[1])

返回具有最大值的键值对。基本上,max 函数按 lambda 函数的返回值排序。

a是一个带有元素的对RDD,例如,('key',int)并且x[1]只是指元素的整数部分。

请注意,该max函数本身将按键排序并返回最大值。

文档位于https://spark.apache.org/docs/1.5.0/api/python/pyspark.html#pyspark.RDD.max

于 2016-02-06T19:36:39.257 回答
5

当 Spark RDD 保留为 RDD 而不是转换为数组时,它们在时间上更有效

strinIntTuppleRDD.reduce((x, y) => if(x._2 > y._2) x else y)
于 2017-09-24T13:40:09.920 回答