我有一个在 Spark 上运行的迭代应用程序,我将其简化为以下代码:
var anRDD: org.apache.spark.rdd.RDD[Int] = sc.parallelize((0 to 1000))
var c: Long = Int.MaxValue
var iteration: Int = 0
while (c > 0) {
iteration += 1
// Manipulate the RDD and cache the new RDD
anRDD = anRDD.zipWithIndex.filter(t => t._2 % 2 == 1).map(_._1).cache() //.localCheckpoint()
// Actually compute the RDD and spawn a new job
c = anRDD.count()
println(s"Iteration: $iteration, Values: $c")
}
后续作业中的内存分配会发生什么变化?
- 当前是否
anRDD
“覆盖”了以前的,还是都保存在内存中?从长远来看,这可能会引发一些内存异常 localCheckpoint
有cache
不同的行为吗?如果localCheckpoint
用来代替cache
,localCheckpoint
截断 RDD 沿袭,那么我希望之前的 RDD 会被覆盖