2

在学习 Spark 时,我阅读了以下内容:

除了流水线之外,如果现有的 RDD 已经保存在集群内存或磁盘上,Spark 的内部调度程序可能会截断 RDD 图的沿袭。在这种情况下,Spark 可以“短路”并根据持久化的 RDD 开始计算。可能发生这种截断的第二种情况是,当 RDD 已经作为早期 shuffle 的副作用实现时,即使它没有显式地 persist()ed。这是一个底层优化,它利用了 Spark shuffle 输出被写入磁盘的事实,并利用了 RDD 图的许多部分被重新计算的事实。

所以,我决定尝试用一个简单的程序(如下)来看看这个:

val pairs = spark.sparkContext.parallelize(List((1,2)))
val x   = pairs.groupByKey()
x.toDebugString  // before collect
x.collect()
x.toDebugString  // after collect

spark.sparkContext.setCheckpointDir("/tmp")
// try both checkpointing and persisting to disk to cut lineage
x.checkpoint()
x.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
x.collect()
x.toDebugString  // after checkpoint

阅读 Spark 书中的上述段落后,我没有看到我的预期。每次调用此方法时,我都看到了完全相同的 toDebugString 输出——每次都指示两个阶段(我原本预计在检查点应该截断沿袭之后只有一个阶段。),如下所示:

scala>     x.toDebugString  // after collect
res5: String =
(8) ShuffledRDD[1] at groupByKey at <console>:25 []
 +-(8) ParallelCollectionRDD[0] at parallelize at <console>:23 []

我想知道我忽略的关键问题是否可能是“可能”这个词,如“时间表可能会截断血统”。在其他情况下,考虑到我上面编写的相同程序,这种截断是否可能发生?还是我写的小程序没有做正确的事情来强制截断血统?提前感谢您提供的任何见解!

4

1 回答 1

1

我认为你应该先坚持/检查点,然后再做collect。从那段代码对我来说,你得到的东西看起来是正确的,因为当 spark 首先collect它不知道它应该坚持或保存任何东西时。

也可能你需要保存结果x.persist然后使用它......我建议 - 试试看:

val pairs = spark.sparkContext.parallelize(List((1,2)))
val x   = pairs.groupByKey()

x.checkpoint()
x.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

// **Also maybe do val xx = x.persist(...) and use xx later.**

x.toDebugString  // before collect
x.collect()
x.toDebugString  // after collect

spark.sparkContext.setCheckpointDir("/tmp")
// try both checkpointing and persisting to disk to cut lineage
x.collect()
x.toDebugString  // after checkpoint
于 2019-07-15T11:49:04.650 回答