0

我正在尝试以下示例

val lista = List(("a", 3), ("a", 1), ("b", 7), ("a", 5))
val rdd = sc.parallelize(lista)

然后在shell中我得到以下

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[40] at parallelize at <console>:30

但是由于某种原因,我仍然没有弄清楚我能够执行这句话

val resAgg = rdd.aggregateByKey(new HashSet[Int])(_+_, _++_)

在外壳中获取它

resAgg: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.HashSet[Int])] = ShuffledRDD[41] at aggregateByKey at <console>:32

所以我有一些问题:

1.- 名为 rdd 的 var 的真正 RDD 类型是什么?因为在 shell 中它显示的类型是 org.apache.spark.rdd.RDD[(String, Int)] 但查看 API 时,RDD 类没有方法 aggregateByKey。顺便说一句,JavaPairRDD 类是否有 aggregateByKey 方法

2.- 我如何验证/知道 RDD 的真实类型

3.- ParallelCollectionRDD 出现了什么?我在 github 上查找它,发现它是一个私有类,所以我猜想是 scala API 上没有出现 is 的原因,但它是做什么用的?

我使用的是 Spark 1.6.2

4

1 回答 1

4

您看到的是隐式转换的效果:

  • rdd 确实有类型org.apache.spark.rdd.RDD[(String, Int)]
  • 当您尝试调用aggregateByKey并且该类型不存在时,编译器会查找一些隐式转换为某种类型,并找到转换为PairRDDFunctions

    implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
      new PairRDDFunctions(rdd)
    }
    
  • 然后,PairRDDFunctions.aggregateByKey被调用。

至于你的最后一个问题:

什么是 ParallelCollectionRDD

RDD 是一个包含许多子类的抽象类,这是其中之一。一般来说,每个子类负责对RDD执行的不同操作,例如读/写/洗牌/检查点等。这种特定类型在调用时使用SparkContext.parallelize- 意思是,它用于并行化来自驱动程序的集合。事实上,它是私有的,你通常不应该关心你手头实际拥有的 RDD 子类型。

于 2016-07-20T07:25:07.503 回答