3

我有一个在 Spark 上运行的迭代应用程序,我将其简化为以下代码:

var anRDD: org.apache.spark.rdd.RDD[Int] = sc.parallelize((0 to 1000))
var c: Long = Int.MaxValue 
var iteration: Int = 0
while (c > 0) {
    iteration += 1
    // Manipulate the RDD and cache the new RDD
    anRDD = anRDD.zipWithIndex.filter(t => t._2 % 2 == 1).map(_._1).cache() //.localCheckpoint()
    // Actually compute the RDD and spawn a new job
    c = anRDD.count()
    println(s"Iteration: $iteration, Values: $c")
}

后续作业中的内存分配会发生什么变化?

  • 当前是否anRDD“覆盖”了以前的,还是都保存在内存中?从长远来看,这可能会引发一些内存异常
  • localCheckpointcache不同的行为吗?如果localCheckpoint用来代替cache,localCheckpoint截断 RDD 沿袭,那么我希望之前的 RDD 会被覆盖
4

2 回答 2

3

不幸的是,Spark 似乎不适合这样的事情。

您最初的实现是不可行的,因为在每次迭代中,较新的 RDD 都会对较旧的 RDD 进行内部引用,因此所有 RDD 都会堆积在内存中。

localCheckpoint是您要达到的目标的近似值。它确实会截断 RDD 的沿袭,但您会失去容错能力。此方法的文档中已明确说明。

checkpoint也是一种选择。它是安全的,但它会在每次迭代时将数据转储到 hdfs。

考虑重新设计方法。这样的黑客迟早会咬人。

于 2019-07-30T09:47:50.913 回答
2
  1. RDD 是不可变的,因此每次转换都会返回一个新的 RDD。所有的 anRDD 都将保存在内存中。见下文(为您的代码运行两次迭代),所有 RDD 的 id 都不同 在此处输入图像描述

    所以是的,从长远来看,这可能会引发一些内存异常。完成处理后,您应该取消持久化 rdd 。

  2. localCheckpoint 的用例与缓存不同。它用于截断 RDD 的沿袭。它不将 RDD 存储到磁盘/本地它提高了性能,但反过来又降低了容错性。

于 2019-07-30T09:45:32.687 回答