在学习 Spark 时,我阅读了以下内容:
除了流水线之外,如果现有的 RDD 已经保存在集群内存或磁盘上,Spark 的内部调度程序可能会截断 RDD 图的沿袭。在这种情况下,Spark 可以“短路”并根据持久化的 RDD 开始计算。可能发生这种截断的第二种情况是,当 RDD 已经作为早期 shuffle 的副作用实现时,即使它没有显式地 persist()ed。这是一个底层优化,它利用了 Spark shuffle 输出被写入磁盘的事实,并利用了 RDD 图的许多部分被重新计算的事实。
所以,我决定尝试用一个简单的程序(如下)来看看这个:
val pairs = spark.sparkContext.parallelize(List((1,2)))
val x = pairs.groupByKey()
x.toDebugString // before collect
x.collect()
x.toDebugString // after collect
spark.sparkContext.setCheckpointDir("/tmp")
// try both checkpointing and persisting to disk to cut lineage
x.checkpoint()
x.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
x.collect()
x.toDebugString // after checkpoint
阅读 Spark 书中的上述段落后,我没有看到我的预期。每次调用此方法时,我都看到了完全相同的 toDebugString 输出——每次都指示两个阶段(我原本预计在检查点应该截断沿袭之后只有一个阶段。),如下所示:
scala> x.toDebugString // after collect
res5: String =
(8) ShuffledRDD[1] at groupByKey at <console>:25 []
+-(8) ParallelCollectionRDD[0] at parallelize at <console>:23 []
我想知道我忽略的关键问题是否可能是“可能”这个词,如“时间表可能会截断血统”。在其他情况下,考虑到我上面编写的相同程序,这种截断是否可能发生?还是我写的小程序没有做正确的事情来强制截断血统?提前感谢您提供的任何见解!