1

我正在使用 Scala 处理大数据,因此内存和时间对我来说比通常更重要。我试图通过细分大型源文件Iterator[String]获得的初始值来提高某些评估的速度,getLines以便并行进行一些子评估并合并结果。我通过递归地slice将迭代器分成两半并调用每个子迭代器上的递归函数来做到这一点。现在,我想知道为什么我得到 GCoverhead 或 JavaHeapSpace 异常,虽然“关键”元素只在递归步骤之前评估一次(为了获得迭代器的大小),但我认为不在递归步骤中,因为slice再次返回一个迭代器(这在实现上是非严格的)。在连接子列表之前,以下(简化!)代码将无法应用于 ~15g 文件。

.duplicate在每个步骤中使用。我查了api,文档.duplicate说“实现可能为一个迭代器迭代的元素分配临时存储空间,但另一个迭代器还没有。”,但还没有元素被迭代。有人可以给我一个提示,那里出了什么问题以及如何解决这个问题?太感谢了!

type itType = Iterator[String]
def src = io.Source.fromFile(args(0)).getLines

// recursively divide into equal size blocks in divide&conquer fashion
def getSubItsDC(it: itType, depth: Int = 4) = {
    println("Getting length of file..")
    val totalSize = src.length
    println(totalSize)
    def rec(it_rec: itType = it, depth_rec: Int = depth, size: Int = totalSize): 
        List[itType] = depth_rec match {
            case n if n > 0 => 
                println(n)
                val (it1, it2) = it_rec.duplicate
                val newSize = size/2
                rec(it1 slice (0,newSize), n-1, newSize) ++ 
                    rec(it2 slice (newSize,size), n-1, newSize)
            case n if n == 0 => List(it_rec)
    }
    println("Starting recursion..")
    rec()
}
getSubItsDC(src)

在 REPL 中,代码以任意大小的迭代器运行同样快(当硬编码 totalSize 时),因此我假设了正确的惰性。

4

1 回答 1

2

我认为您最好使用itr grouped size来获得Iterator[Iterator[String]](a GroupedIterator):

scala> val itr = (1 to 100000000).iterator grouped 1000000
itr: Iterator[Int]#GroupedIterator[Int] = non-empty iterator

这将允许您分块处理文件的某些部分。

为什么您的解决方案使用太多内存

复制 anIterator显然一种操作,这意味着 Iterator 可能必须缓存其计算值。例如:

scala> val itr = (1 to 100000000).iterator
itr: Iterator[Int] = non-empty iterator

scala> itr filter (_ % 10000000 == 0) foreach println
10000000
....
100000000

但是当我复制时:

scala> val (a, b) = (1 to 100000000).iterator.duplicate
a: Iterator[Int] = non-empty iterator
b: Iterator[Int] = non-empty iterator

scala> a filter (_ % 10000000 == 0) foreach println

//oh dear, garbage collecting
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

在这个例子中,当我运行时a,为了b重复,a已经迭代b没有迭代的元素需要被缓存

于 2012-06-12T15:29:42.750 回答