0

似乎有一些关于此的帖子,但似乎没有人回答我的理解。

以下代码在 DataBricks 上运行:

spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed  

添加了各种改进:

...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...

返回:

(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 []
 |  MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12 []
 |  ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12 []
 checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
 ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
 ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
 res53: Boolean = false

问题一:

ds.rdd.isCheckpointedds2.rdd.isCheckpointed都返回False即使计数我有一个非懒惰的情况。为什么,特别是 ../loc 7 & 10 是用(部分)文件编写的?我们还可以看到 ReliableCheckPoint!

没有很好地解释整个概念。试图解决这个问题。

问题 2 - 次要问题:

最新版本的 Spark 2.4 是否真的需要缓存?ds 上的新分支,如果不缓存,会导致重新计算还是现在更好?不使用检查点数据似乎很奇怪,或者我们可以说 Spark 并不真正知道什么更好?

从高性能 Spark 我得到的混合印象是检查点不是那么推荐,但又是这样。

4

1 回答 1

2

TL;DR:您不检查实际检查点的对象:

ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
// Boolean = true

ds.rdd.isCheckpointed 或 ds2.rdd.isCheckpointed 都返回 False

这是预期的行为。被检查点的对象不是您引用的转换后的 RDD(这是转换为外部表示所需的额外转换的结果),而是内部 RDD 对象(实际上,正如您在上面看到的,它甚至不是最新的内部 RDD,但它的父级)。

此外,在第一种情况下,您只是使用了错误的Dataset对象-如 链接答案中所述, Dataset.checkpoint 返回一个新的Dataset

即使计数我有一个非懒惰的情况

这没有多大意义。默认checkpoint实现eager,因此它强制评估。即使不是这样,也不是强制评估Dataset.count的正确方法。

缓存是否真的需要最新版本

正如您在链接源中看到的那样,在内部Dataset.checkpoint使用,RDD.checkpoint因此适用相同的规则。但是,您已经执行了一个单独的操作来强制检查点,因此额外的缓存,特别是考虑到Dataset持久性的成本,可能是一种矫枉过正。

当然,如果有疑问,您可以考虑在特定环境中进行基准测试。

于 2019-01-02T13:46:42.580 回答