我对缓存的理解是错误的吗?经过我所有的转换后得到的 RDD 非常小,比如 1 GB。计算它的数据非常大,大约 700 GB。
我必须运行逻辑来读取数千个相当大的文件,所有这些都是为了计算更小的结果 RDD。每次迭代都会处理下一批 400 个文件,当读入这些文件时,其大小可能会达到 700 GB 左右。传入的 RDD 以相同的方式处理(读取和转换),然后与累积的 RDD 合并。我在每次迭代后缓存和检查点(也取消持久化(使用blocking = true)生成的rdd的旧版本),以便我可以切断RDD沿袭,这样如果出现问题我就不必重新计算结果,并且以节省执行者的空间。所以,我认为在任何时候我真的只需要 1 GB * 迭代次数 + 大约 750 GB 的内存来完成我的工作,而 1.6 TB 应该绰绰有余。但显然我误解了一些东西。
在每次迭代中,GC 时间越来越长。Spark UI 显示执行程序位于红色区域(> 10% 的时间花在 GC 上)。然后整个工作可能在第 3 次或第 4 次迭代中失败,出现MemoryLimit exceeded、Lost Executor/no path to executor和 YARN 杀死我的执行程序等消息。我认为通过缓存和检查点可以为我的执行程序节省大量空间。我只是不明白是否存在某种内存泄漏?为什么内存不断填满?
我正在使用 m3.large 实例在 EMR 上运行 Spark 2.1.1。我的集群的大小限制在 ~1.6 TB。我使用以下配置运行:
driver-memory 8g
deploy-mode cluster
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=100
spark.dynamicAllocation.maxExecutors=200
spark.shuffle.service.enabled=true
executor-cores 4
executor-memory 8g
我的代码是什么样的:
var accRdd = <empty>
val batchSize = 400
var iteration = 1
filesToIngest.grouped(batchSize).foreach {
val transformedRdd = transform(accRdd).reduceByKey((row1, row2) =>
combine(row1, row2)
)
val oldAccRdd = accRdd
accRdd = accRdd.union(transformedRdd).reduceByKey((row1, row2) =>
combine(row1, row2)
).coalesce(5 + i)
accRdd.persist(MEMORY_AND_DISK_SER)
accRdd.checkpoint()
oldAccRdd.unpersist(blocking = true) // I assume this will ensure all references to this cleared from memory
log_info(s"Total row count on iteration: ${accRdd.count()}")
iteration += 1
}
我遵循了以下建议:https ://github.com/deeplearning4j/nd4j/issues/1251 ,并试图避免调整与 gc、内存分数和 jvm 相关的其他配置变量。同样,我正在寻找可能发生的事情的解释,以及我对缓存/检查点的假设可能是错误的。谢谢!