2

在最简单的形式中,RDD 只是链式计算的占位符,可以任意安排在任何机器上执行:

val src = sc.parallelize(0 to 1000)

val rdd = src.mapPartitions { itr =>
  Iterator(SparkEnv.get.executorId)
}

for (i <- 1 to 3) {

  val vs = rdd.collect()
  println(vs.mkString)
}

/* yielding:
1230123012301230
0321032103210321
2130213021302130
*/

显然可以通过使任何上游 RDD 持久化来覆盖此行为,这样 Spark 调度程序将最大限度地减少冗余计算:

val src = sc.parallelize(0 to 1000)

src.persist()

val rdd = src.mapPartitions { itr =>
  Iterator(SparkEnv.get.executorId)
}

for (i <- 1 to 3) {

  val vs = rdd.collect()
  println(vs.mkString)
}

/* yield:
2013201320132013
2013201320132013
2013201320132013
each partition has a fixed executorID
*/

现在我的问题是:

我不喜欢 vanilla 缓存机制(请参阅这篇文章:在 Apache Spark 中,我可以增量缓存 RDD 分区吗?)并编写了自己的缓存机制(通过实现新的 RDD)。由于新的缓存机制只能从本地磁盘/内存中读取现有值,如果有多个执行器,每次在另一台机器上的任务中执行分区时,我对每个分区的缓存都会经常丢失。

所以我的问题是:

如何模仿 Spark RDD 持久实现来要求 DAG 调度程序强制/建议位置感知任务调度?没有实际调用该.persist()方法,因为它是不必要的。

4

0 回答 0