1

我有一个大型批处理并行计算,我在 scala 中使用并行映射。我注意到随着工作人员完成,CPU 使用率似乎逐渐下降。这一切都归结为对 Map 对象内部的调用的调用

scala.collection.parallel.thresholdFromSize(length, tasksupport.parallelismLevel)

查看代码,我看到了这个:

def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
  val p = parallelismLevel
  if (p > 1) 1 + sz / (8 * p)
  else sz
}

我的计算在大量内核上运行良好,现在我明白了为什么..

thesholdFromSize(1000000,24) = 5209
thesholdFromSize(1000000,4) = 31251

如果我在 24 个 CPU 上有一个长度为 1000000 的数组,它将一直分区到 5209 个元素。如果我将相同的数组传递到我的 4 CPU 机器上的并行集合中,它将在 31251 个元素处停止分区。

需要注意的是,我计算的运行时间并不统一。每个单元的运行时间可以长达 0.1 秒。对于 31251 个项目,这是 3100 秒或 52 分钟的时间,其他工人可以介入并抢夺工作,但不是。在并行计算期间监视 CPU 利用率时,我已经准确地观察到了这种行为。显然我很想在大型机器上运行,但这并不总是可能的。

我的问题是:有什么方法可以影响并行集合,给它一个更适合我的问题的更小的阈值?我唯一能想到的就是自己实现“Map”类,但这似乎是一个非常不优雅的解决方案。

4

3 回答 3

0

我认为你需要做的就是这样的:

yourCollection.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(24))

parallelism参数默认为您拥有的 CPU 内核数,但您可以像上面那样覆盖它。这也显示在源代码中ParIterableLike

于 2013-10-09T13:57:01.877 回答
0

您想阅读配置 Scala 并行集合。特别是,您可能需要实现 TaskSupport 特征。

于 2013-10-08T21:52:40.747 回答
0

0.1 秒足以单独处理它。将每个单元(或 10 个单元)的处理包装在单独的 Runnable 中,并将它们全部提交到 FixedThreadPool。另一种方法是使用 ForkJoinPool - 这样更容易控制所有计算的结束。

于 2013-10-10T20:39:39.670 回答