在最简单的形式中,RDD 只是链式计算的占位符,可以任意安排在任何机器上执行:
val src = sc.parallelize(0 to 1000)
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yielding:
1230123012301230
0321032103210321
2130213021302130
*/
显然可以通过使任何上游 RDD 持久化来覆盖此行为,这样 Spark 调度程序将最大限度地减少冗余计算:
val src = sc.parallelize(0 to 1000)
src.persist()
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yield:
2013201320132013
2013201320132013
2013201320132013
each partition has a fixed executorID
*/
现在我的问题是:
我不喜欢 vanilla 缓存机制(请参阅这篇文章:在 Apache Spark 中,我可以增量缓存 RDD 分区吗?)并编写了自己的缓存机制(通过实现新的 RDD)。由于新的缓存机制只能从本地磁盘/内存中读取现有值,如果有多个执行器,每次在另一台机器上的任务中执行分区时,我对每个分区的缓存都会经常丢失。
所以我的问题是:
如何模仿 Spark RDD 持久实现来要求 DAG 调度程序强制/建议位置感知任务调度?没有实际调用该.persist()
方法,因为它是不必要的。