我正在尝试使用 Scala 的并行集合来并行调度一些计算。因为有很多输入数据,所以我使用可变数组来存储数据以避免 GC 问题。这是我最初采用的方法:
// initialize the reusable input data structure
val inputData = new Array[Array[Int]](Runtime.getRuntime.availableProcessors*ChunkSize)
for (i <- 0 until inputData.length) {
inputData(i) = new Array[Int](arraySize)
}
// process the input
while (haveMoreInput()) {
// read the input--must be sequential!
for (array <- 0 until inputData.length) {
for (index <- 0 until arraySize) {
array(index) = deserializeFromExternalSource()
}
}
// map the data in parallel
// note that the input data is NOT modified by longRuningProcess
val results = for (array <- inputData.par) yield {
longRunningProcess(array)
}
// use the results--must be sequential and ordered as input
for (result <- results.toArray) {
useResult(result)
}
}
鉴于 aParallelArray
的底层数组可以安全地重用(即,修改并用作 another 的底层结构ParallelArray
),上面的片段应该可以按预期工作。但是,运行时它会因内存错误而崩溃:
*** Error in `*** Error in `java': double free or corruption (fasttop): <memory address> ***
这表面上与并行集合直接使用创建它的数组有关;也许它在超出范围时试图释放这个数组。在任何情况下,由于内存限制,不能为每个循环创建一个新数组。var parInputData = inputData.par
显式地在循环内部和外部创建一个while
循环会导致相同的双释放错误。
我不能简单地使inputData
自己成为一个并行集合,因为它需要按顺序填充(尝试对并行版本进行分配,我意识到分配不是按顺序执行的)。使用 aVector
作为外部数据结构似乎适用于相对较小的输入大小(< 1000000 个输入数组),但会导致大输入的 GC 开销异常。
我最终采用的方法涉及制作Vector[Vector[Array[Int]]]
, 外部向量的长度等于正在使用的并行线程的数量。然后我用一大块输入数据数组手动填充每个子Vector
,然后在外部向量上进行并行映射。
最后一种方法是可行的,但是手动将输入分成块并将这些块添加到另一个深度的并行集合中是很乏味的。有没有办法让 Scala 重用可变数组进行并行操作?
编辑:将上面的并行向量解决方案与使用同步队列的手动并行化解决方案进行基准比较,表明并行向量要慢约 50%。我想知道这是否只是更好抽象的开销,或者是否可以通过使用并行数组而不是Vector
s 来减少这种差距;Vector
这将导致使用数组而不是s 的另一个好处。