16

是否有可能,使用 Scala 的并行集合来并行化 aIterator 而无需事先完全评估它?

在这里,我说的是并行化 an 上的函数转换Iterator,即mapflatMap。我认为这需要Iterator提前评估一些元素,然后计算更多,一旦通过next.

我所能找到的只是需要将迭代器转换为 aIterableStream充其量是 a 。当Stream我调用它时,它会被完全评估.par

如果这不是现成的,我也欢迎实施建议。实现应该支持并行mapflatMap.

4

4 回答 4

6

我意识到这是一个老问题,但是iterataParIterator库中的实现是否符合您的要求?

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads
于 2015-06-11T23:58:49.243 回答
4

使用标准库的最佳选择可能不是使用并行集合,而是concurrent.Future.traverse

import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })

虽然我认为这将尽快开始执行整个事情。

于 2013-06-18T20:44:48.640 回答
2

从 ML,并行遍历迭代器元素:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

Future.traverse出于类似的原因,我搬走了。对于我的用例,保持 N 个作业正常工作,我最后编写了代码来限制从作业队列中提供执行上下文。

我的第一次尝试涉及阻塞馈线线程,但这也有可能阻塞想要在执行上下文中生成任务的任务。你知道吗,阻塞是邪恶的。

于 2013-06-19T04:58:19.457 回答
0

准确地遵循您的目标有点困难,但也许是这样的:

val f = (x: Int) => x + 1
val s = (0 to 9).toStream map f splitAt(6) match { 
  case (left, right) => left.par; right 
}

这将在前 6 个元素上并行计算 f,然后在其余元素上返回一个流。

于 2013-06-19T05:00:50.960 回答