我有一个Iterable
需要执行的“工作单元”,没有特定的顺序,并且可以轻松地并行运行而不会相互干扰。
不幸的是,一次运行太多会超出我的可用 RAM,所以我需要确保在任何给定时间只有少数同时运行。
最基本的,我想要一个这种类型签名的函数:
parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]
这样输出Iterator
不一定与输入的顺序相同(如果我想了解结果的来源,我可以输出一对与输入或其他东西。)然后消费者可以增量地使用生成的迭代器,而无需占用机器的所有内存,同时为该任务保持尽可能多的并行性。
此外,我希望该功能尽可能高效。例如,我最初的想法是按照以下方式做一些事情:
xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)
我希望它toSet
会通知 Scala 的并行集合,一旦它们准备好,它就可以开始从其迭代器中以任何顺序生成元素,并且grouped
调用是限制同时工作的数量。不幸的是,调用看起来并没有toSet
达到预期的效果(在我的实验中,结果的返回顺序与没有par
调用时的顺序相同)并且grouped
通话是次优的。例如,如果我们的组大小为 100,其中 99 个作业立即在十几个核心上完成,但其中一个特别慢,则其余大部分核心将处于空闲状态,直到我们可以移动到下一个组。拥有一个最多与我的块大小一样大的“自适应窗口”会更干净,但不会被缓慢的工作人员阻止。
我可以设想自己用工作窃取(de)队列或类似的东西来写这样的东西,但我想在Scala的并行中已经在某种程度上为我完成了处理并发原语的许多艰苦工作收藏图书馆。有谁知道我可以重用它的哪些部分来构建这个功能,或者对如何实现这样的操作有其他建议?