0

我对缓存的理解是错误的吗?经过我所有的转换后得到的 RDD 非常小,比如 1 GB。计算它的数据非常大,大约 700 GB。

我必须运行逻辑来读取数千个相当大的文件,所有这些都是为了计算更小的结果 RDD。每次迭代都会处理下一批 400 个文件,当读入这些文件时,其大小可能会达到 700 GB 左右。传入的 RDD 以相同的方式处理(读取和转换),然后与累积的 RDD 合并。我在每次迭代后缓存和检查点(也取消持久化(使用blocking = true)生成的rdd的旧版本),以便我可以切断RDD沿袭,这样如果出现问题我就不必重新计算结果,并且以节省执行者的空间。所以,我认为在任何时候我真的只需要 1 GB * 迭代次数 + 大约 750 GB 的内存来完成我的工作,而 1.6 TB 应该绰绰有余。但显然我误解了一些东西。

在每次迭代中,GC 时间越来越长。Spark UI 显示执行程序位于红色区域(> 10% 的时间花在 GC 上)。然后整个工作可能在第 3 次或第 4 次迭代中失败,出现MemoryLimit exceededLost Executor/no path to executor和 YARN 杀死我的执行程序等消息。我认为通过缓存和检查点可以为我的执行程序节省大量空间。我只是不明白是否存在某种内存泄漏?为什么内存不断填满?

我正在使用 m3.large 实例在 EMR 上运行 Spark 2.1.1。我的集群的大小限制在 ~1.6 TB。我使用以下配置运行:

driver-memory 8g
deploy-mode cluster 
spark.dynamicAllocation.enabled=true 
spark.dynamicAllocation.minExecutors=100 
spark.dynamicAllocation.maxExecutors=200
spark.shuffle.service.enabled=true 
executor-cores 4 
executor-memory 8g 

我的代码是什么样的:

var accRdd = <empty>
val batchSize = 400
var iteration = 1
filesToIngest.grouped(batchSize).foreach {
    val transformedRdd = transform(accRdd).reduceByKey((row1, row2) => 
      combine(row1, row2)
    )
    val oldAccRdd = accRdd
    accRdd = accRdd.union(transformedRdd).reduceByKey((row1, row2) => 
      combine(row1, row2)
    ).coalesce(5 + i)
    accRdd.persist(MEMORY_AND_DISK_SER)
    accRdd.checkpoint()
    oldAccRdd.unpersist(blocking = true) // I assume this will ensure all references to this cleared from memory
    log_info(s"Total row count on iteration: ${accRdd.count()}")
    iteration += 1
}

我遵循了以下建议:https ://github.com/deeplearning4j/nd4j/issues/1251 ,并试图避免调整与 gc、内存分数和 jvm 相关的其他配置变量。同样,我正在寻找可能发生的事情的解释,以及我对缓存/检查点的假设可能是错误的。谢谢!

4

1 回答 1

0

您可能想查看我们记忆页面中的一些建议: https ://deeplearnin4j.org/memory

https://deeplearning4j.org/spark

一般来说,deeplearning4j 有自己的堆外内存。您应该使用更多的执行程序设置较低的批处理大小,注意 javacpp off heap 配置并设置允许的 spark 内存。

于 2017-10-17T02:21:19.937 回答