3

我有一个Iterable需要执行的“工作单元”,没有特定的顺序,并且可以轻松地并行运行而不会相互干扰。

不幸的是,一次运行太多会超出我的可用 RAM,所以我需要确保在任何给定时间只有少数同时运行。

最基本的,我想要一个这种类型签名的函数:

parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]

这样输出Iterator不一定与输入的顺序相同(如果我想了解结果的来源,我可以输出一对与输入或其他东西。)然后消费者可以增量地使用生成的迭代器,而无需占用机器的所有内存,同时为该任务保持尽可能多的并行性。

此外,我希望该功能尽可能高效。例如,我最初的想法是按照以下方式做一些事情:

xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)

我希望它toSet会通知 Scala 的并行集合,一旦它们准备好,它就可以开始从其迭代器中以任何顺序生成元素,并且grouped调用是限制同时工作的数量。不幸的是,调用看起来并没有toSet达到预期的效果(在我的实验中,结果的返回顺序与没有par调用时的顺序相同)并且grouped通话是次优的。例如,如果我们的组大小为 100,其中 99 个作业立即在十几个核心上完成,但其中一个特别慢,则其余大部分核心将处于空闲状态,直到我们可以移动到下一个组。拥有一个最多与我的块大小一样大的“自适应窗口”会更干净,但不会被缓慢的工作人员阻止。

我可以设想自己用工作窃取(de)队列或类似的东西来写这样的东西,但我想在Scala的并行中已经在某种程度上为我完成了处理并发原语的许多艰苦工作收藏图书馆。有谁知道我可以重用它的哪些部分来构建这个功能,或者对如何实现这样的操作有其他建议?

4

1 回答 1

3

并行集合框架允许您指定用于给定任务的最大线程数。使用 scala-2.10,您需要执行以下操作:

def parMap[A,B](x : Iterable[A], f : A => B, chunkSize : Int) = {
  val px = x.par
  px.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(chunkSize))
  px map f
}

chunkSize这将防止在任何时候运行多个操作。这使用下面的工作窃取策略来保持演员工作,因此不会遇到与grouped上面示例相同的问题。

但是,这样做不会将结果重新排序为第一个完成的顺序。为此,我建议将您的操作变成一个演员,并让一个小型演员池运行这些操作,然后在他们完成时将结果发送回您。

于 2013-02-06T15:50:57.293 回答