2

我正在尝试使用 Scala 的并行集合来并行调度一些计算。因为有很多输入数据,所以我使用可变数组来存储数据以避免 GC 问题。这是我最初采用的方法:

// initialize the reusable input data structure
val inputData = new Array[Array[Int]](Runtime.getRuntime.availableProcessors*ChunkSize)
for (i <- 0 until inputData.length) {
  inputData(i) = new Array[Int](arraySize)
}

// process the input
while (haveMoreInput()) {
  // read the input--must be sequential!
  for (array <- 0 until inputData.length) {
    for (index <- 0 until arraySize) {
      array(index) = deserializeFromExternalSource()
    }
  }
  // map the data in parallel
  // note that the input data is NOT modified by longRuningProcess
  val results = for (array <- inputData.par) yield {
    longRunningProcess(array)
  }
  // use the results--must be sequential and ordered as input
  for (result <- results.toArray) {
    useResult(result)
  }
}

鉴于 aParallelArray的底层数组可以安全地重用(即,修改并用作 another 的底层结构ParallelArray),上面的片段应该可以按预期工作。但是,运行时它会因内存错误而崩溃:

*** Error in `*** Error in `java': double free or corruption (fasttop): <memory address> ***

这表面上与并行集合直接使用创建它的数组有关;也许它在超出范围时试图释放这个数组。在任何情况下,由于内存限制,不能为每个循环创建一个新数组。var parInputData = inputData.par显式地在循环内部和外部创建一个while循环会导致相同的双释放错误。

我不能简单地使inputData自己成为一个并行集合,因为它需要按顺序填充(尝试对并行版本进行分配,我意识到分配不是按顺序执行的)。使用 aVector作为外部数据结构似乎适用于相对较小的输入大小(< 1000000 个输入数组),但会导致大输入的 GC 开销异常。

我最终采用的方法涉及制作Vector[Vector[Array[Int]]], 外部向量的长度等于正在使用的并行线程的数量。然后我用一大块输入数据数组手动填充每个子Vector,然后在外部向量上进行并行映射。

最后一种方法是可行的,但是手动将输入分成块并将这些块添加到另一个深度的并行集合中是很乏味的。有没有办法让 Scala 重用可变数组进行并行操作?

编辑:将上面的并行向量解决方案与使用同步队列的手动并行化解决方案进行基准比较,表明并行向量要慢约 50%。我想知道这是否只是更好抽象的开销,或者是否可以通过使用并行数组而不是Vectors 来减少这种差距;Vector这将导致使用数组而不是s 的另一个好处。

4

1 回答 1

3

将数据拆分成块并没有真正的意义,Parallel Collections 库的大部分意义在于它可以为您做到这一点,并且比使用固定的块大小做得更好。此外,JVM 上的数组与 C 中的数组不同,它们更像是指向许多小数组的指针数组,这使得它们效率低下。

解决这个问题的更优雅的方法是使用普通Array和使用ParRange对其进行操作。longRunningProcess必须更改为一次对单个元素进行操作:

val arraySize = ???

val inputData = Array[Int](arraySize)
val outputData = Array[ResultType](arraySize)

while(haveMoreInput()) {
  for (i <- 0 until arraySize)
    inputData(i) = deserializeFromExternalSource()
  for (i <- (0 until arraySize).par)
    outputData(i) = longRunningProcess(inputData(i))
  outputData.foreach(useResult)
}

这仅使用两个大数组,并且从不分配任何新数组。ParArray.map, ParArray.toArray, 并Array.par在原始代码中分配了新数组。

我们仍然必须使用固定arraySize来确保我们不会将更多数据加载到我们有空间的内存中。更好的解决方案是使用响应式流,但它们还没有准备好投入生产。

于 2014-08-01T00:25:06.327 回答