我有一个大型批处理并行计算,我在 scala 中使用并行映射。我注意到随着工作人员完成,CPU 使用率似乎逐渐下降。这一切都归结为对 Map 对象内部的调用的调用
scala.collection.parallel.thresholdFromSize(length, tasksupport.parallelismLevel)
查看代码,我看到了这个:
def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
val p = parallelismLevel
if (p > 1) 1 + sz / (8 * p)
else sz
}
我的计算在大量内核上运行良好,现在我明白了为什么..
thesholdFromSize(1000000,24) = 5209
thesholdFromSize(1000000,4) = 31251
如果我在 24 个 CPU 上有一个长度为 1000000 的数组,它将一直分区到 5209 个元素。如果我将相同的数组传递到我的 4 CPU 机器上的并行集合中,它将在 31251 个元素处停止分区。
需要注意的是,我计算的运行时间并不统一。每个单元的运行时间可以长达 0.1 秒。对于 31251 个项目,这是 3100 秒或 52 分钟的时间,其他工人可以介入并抢夺工作,但不是。在并行计算期间监视 CPU 利用率时,我已经准确地观察到了这种行为。显然我很想在大型机器上运行,但这并不总是可能的。
我的问题是:有什么方法可以影响并行集合,给它一个更适合我的问题的更小的阈值?我唯一能想到的就是自己实现“Map”类,但这似乎是一个非常不优雅的解决方案。