9

当我运行如下代码时:

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())

并观察 Yarn 中的各个阶段,我注意到 Spark 正在执行 DAG 计算 TWICE——一次用于实现 RDD 并缓存它的 distinct+count,然后完全是第二次创建检查点副本。

既然 RDD 已经物化和缓存了,为什么检查点不简单地利用这一点,将缓存的分区保存到磁盘呢?

是否有现有的方法(某种配置设置或代码更改)来强制 Spark 利用这一点并且只运行一次操作,而检查点只会复制东西?

我是否需要“实现”两次?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())

newRDD.checkpoint
print(newRDD.count())

我创建了一个 Apache Spark Jira 票证,以使其成为功能请求: https ://issues.apache.org/jira/browse/SPARK-8666

4

3 回答 3

7

看起来这可能是一个已知问题。查看较旧的 JIRA 票证,https://issues.apache.org/jira/browse/SPARK-8582

于 2015-06-26T17:01:48.187 回答
3

这是一个老问题。但它也影响了我,所以我做了一些挖掘。我在 jira 和 github 的更改跟踪历史记录中发现了一堆非常无用的搜索结果。这些搜索结果包含大量来自开发人员关于他们提议的编程更改的技术喋喋不休。这对我来说并没有提供非常丰富的信息,我建议限制你花在看它的时间。

关于此事我能找到的最清晰的信息在这里: https ://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

需要检查点的 RDD 将被计算两次;因此建议在 rdd.checkpoint() 之前做一个 rdd.cache()

鉴于 OP 实际上确实使用了持久性和检查点,他可能走在正确的轨道上。我怀疑唯一的问题是他调用检查点的方式。我对火花很陌生,但我认为他应该这样做:

newRDD = newRDD.checkpoint

希望这很清楚。根据我的测试,这消除了我的一个数据帧的冗余重新计算。

于 2020-10-07T23:03:29.983 回答
0

您缓存的数据可能会因为内存不足而被驱逐,您可以打开 Spark UI 来检查是否属实。

于 2020-08-24T13:44:33.327 回答