4

我正在尝试保留一个 spark RDD,其中每个分区的元素都共享对单个大对象的访问。但是,该对象似乎多次存储在内存中。将我的问题减少到只有一个只有 200 个元素的单个分区的玩具箱:

val nElements = 200
class Elem(val s:Array[Int])

val rdd = sc.parallelize(Seq(1)).mapPartitions( _ => {
    val sharedArray = Array.ofDim[Int](10000000) // Should require ~40MB
    (1 to nElements).toIterator.map(i => new Elem(sharedArray))
}).cache()

rdd.count() //force computation    

这会消耗预期的内存量,如日志中所示:

storage.MemoryStore:块 rdd_1_0 作为值存储在内存中(估计大小 38.2 MB,空闲 5.7 GB)

但是,200 是最大元素数。设置nElements=201产量:

storage.MemoryStore:块 rdd_1_0 作为值存储在内存中(估计大小 76.7 MB,空闲 5.7 GB)

这是什么原因造成的?这个神奇的数字 200 是从哪里来的,我该如何增加呢?


编辑澄清

向函数添加 println 表明它确实只被调用了一次。此外,运行:

rdd.map(_.s.hashCode).min == rdd.map(_.s.hashCode).max  // returns true

..揭示了所有 10000000 个元素确实指向同一个对象,因此数据结构基本上表现正确。当 nExamples 大得多(例如 20000)时,问题就出现了,因此它不能持续存在。

storage.MemoryStore:没有足够的空间在内存中缓存 rdd_1_0!(目前计算为 6.1 GB)

当我nExamples=500成功设置它时,rdd 在内存中说估计大小 1907.4 MB,但我可以看到我的内存使用量的实际增加远小于这个。

4

1 回答 1

0

对于将来遇到此问题的任何人,我最终想出了一个超级 hacky 的解决方案(尽管我仍然很高兴听到更好的解决方案)。我没有使用 rdd.cache(),而是定义:

def cached[T: ClassTag](rdd:RDD[T]) = {
    rdd.mapPartitions(p => 
        Iterator(p.toSeq)
    ).cache().mapPartitions(p =>
        p.next().toIterator
    )
}

以便cached(rdd)返回从“缓存”列表生成的 RDD

于 2014-12-09T23:55:25.383 回答