那么“如何?”的答案是什么?是 :
import scalaz.stream._
import scalaz.stream.async._
import Process._
def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)
val q = unboundedQueue[Int]
val out = unboundedQueue[Int]
q.dequeue
.flatMap(e => emitAll(doSomething(e)))
.observe(out.enqueue)
.to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well
q.enqueueAll(List(3,5,7)).run
q.size.continuous
.filter(0==)
.map(_ => -1)
.to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll
import scalaz._, Scalaz._
val result = out
.dequeue
.takeWhile(_ != -1)
.map(_.point[List])
.foldMonoid.runLast.run.get //run synchronously
结果:
result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
但是,您可能会注意到:
1)我必须解决终止问题。akka-stream 存在同样的问题,并且更难在那里解决,因为您无法访问队列并且没有自然的背压来保证队列不会因为快速阅读器而为空。
2)我不得不为输出引入另一个队列(并将其转换为List
),因为工作队列在计算结束时变为空。
因此,这两个库都不太适合此类要求(有限流),但是 scalaz-stream(在删除 scalaz 依赖项后将变为“fs2”)足够灵活,可以实现您的想法。最大的“但是”是默认情况下它将按顺序运行。(至少)有两种方法可以让它更快:
1)将你的doSomething分成多个阶段,.flatMap(doSomething1).flatMap(doSomething2).map(doSomething3)
然后在它们之间放置另一个队列(如果阶段花费相同的时间,大约快3倍)。
2) 并行化队列处理。AkkamapAsync
可以做到这一点——它可以map
自动并行执行。Scalaz-stream 有块 - 你可以将你的 q 分成块,比如说 5,然后并行处理块中的每个元素。无论如何,这两种解决方案(akka vs scalaz)都不太适合使用一个队列作为输入和输出。
但是,再一次,这一切都太复杂和毫无意义,因为有一个经典的简单方法:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.flatMap(doSomething)
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
这是并行化的:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.par.flatMap(doSomething).toList
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
所以,是的,我会说 scalaz-stream 和 akka-streams 都不符合您的要求;然而,经典的 scala 平行系列非常适合。
如果您需要跨多个 JVM 的分布式计算 - 看看 Apache Spark,它的 scala-dsl 使用相同的 map/flatMap/fold 样式。它允许您使用不适合 JVM 内存的大型集合(通过跨 JVM 扩展它们),因此您可以@tailrec def calculate
通过使用 RDD 而不是List
. 它还将为您提供处理内部故障的工具doSomething
。
PS 所以这就是为什么我不喜欢使用流媒体库来完成这些任务的原因。流式传输更像是来自某些外部系统(如 HttpRequests)的无限流,而不是预定义(甚至是大)数据的计算。
PS2 如果您需要类似反应(没有阻塞),您可以使用Future
(或scalaz.concurrent.Task
) +Future.sequence