这个想法是在 96 核机器上运行并行作业,工作窃取ForkJoinPool
.
以下是我目前使用的代码:
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
val sequence: ParSeq[Item] = getItems().par
sequence.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool())
val results = for {
item <- sequence
res = doSomethingWith(item)
} yield res
在这里,sequence
有大约20,000个项目。大多数项目需要 2-8 秒来处理,其中只有大约 200 个需要更长的时间,大约 40 秒。
问题:
一切都运行良好,但是,工作窃取方面似乎并没有很好地工作。以下是随着时间的推移与实际负载(蓝色)相比的预期总 CPU 负载(黑色):
查看 CPU 活动时,很明显随着工作接近尾声,使用的内核越来越少。在最后 10 分钟内,只有 2 或 3 个核心还在忙于依次处理数十个项目,一个接一个。
为什么仍然在队列中的项目不会被其他空闲内核窃取,即使使用ForkJoinPool
应该是工作窃取的 a ?
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html