0

我有一种预感,我可以(应该?)使用 scalaz-streams 来解决我的问题,就像这样。

我有一个起始项目 A。我有一个接受 A 并返回 A 列表的函数。

def doSomething(a : A) : List[A]

我有一个以 1 项(起始项)开头的工作队列。当我们处理 ( doSomething) 每个项目时,它可能会将许多项目添加到同一工作队列的末尾。然而,在某些时候(在数百万个项目之后),我们doSomething处理的每个后续项目将开始向工作队列添加越来越少的项目,最终不会添加新项目(doSomething 将为这些项目返回 Nil)。这就是我们知道计算最终将终止的方式。

假设 scalaz-streams 适用于此,请给我一些提示,说明我应该考虑哪些整体结构或类型来实现它?

一旦完成了使用单个“worker”的简单实现,我还想使用多个 worker 并行处理队列项,例如拥有 5 个 worker 池(每个 worker 将其任务分配给代理进行计算doSomething)所以我需要在这个算法中处理效果(比如工人失败)。

4

1 回答 1

2

那么“如何?”的答案是什么?是 :

import scalaz.stream._
import scalaz.stream.async._
import Process._

def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)

val q = unboundedQueue[Int]
val out = unboundedQueue[Int]

q.dequeue
 .flatMap(e => emitAll(doSomething(e)))
 .observe(out.enqueue)
 .to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well

q.enqueueAll(List(3,5,7)).run
q.size.continuous
 .filter(0==)
 .map(_ => -1)
 .to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll

import scalaz._, Scalaz._
val result = out
  .dequeue
  .takeWhile(_ != -1)
  .map(_.point[List])
  .foldMonoid.runLast.run.get //run synchronously

结果:

result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

但是,您可能会注意到:

1)我必须解决终止问题。akka-stream 存在同​​样的问题,并且更难在那里解决,因为您无法访问队列并且没有自然的背压来保证队列不会因为快速阅读器而为空。

2)我不得不为输出引入另一个队列(并将其转换为List),因为工作队列在计算结束时变为空。

因此,这两个库都不太适合此类要求(有限流),但是 scalaz-stream(在删除 scalaz 依赖项后将变为“fs2”)足够灵活,可以实现您的想法。最大的“但是”是默认情况下它将按顺序运行。(至少)有两种方法可以让它更快:

1)将你的doSomething分成多个阶段,.flatMap(doSomething1).flatMap(doSomething2).map(doSomething3)然后在它们之间放置另一个队列(如果阶段花费相同的时间,大约快3倍)。

2) 并行化队列处理。AkkamapAsync可以做到这一点——它可以map自动并行执行。Scalaz-stream 有块 - 你可以将你的 q 分成块,比如说 5,然后并行处理块中的每个元素。无论如何,这两种解决方案(akka vs scalaz)都不太适合使用一个队列作为输入和输出。

但是,再一次,这一切都太复杂和毫无意义,因为有一个经典的简单方法:

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
  if (l.isEmpty) acc else { 
    val processed = l.flatMap(doSomething) 
    calculate(processed, acc ++ processed) 
  }

scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

这是并行化的:

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
  if (l.isEmpty) acc else { 
    val processed = l.par.flatMap(doSomething).toList
    calculate(processed, acc ++ processed) 
  }

scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

所以,是的,我会说 scalaz-stream 和 akka-streams 都不符合您的要求;然而,经典的 scala 平行系列非常适合。

如果您需要跨多个 JVM 的分布式计算 - 看看 Apache Spark,它的 scala-dsl 使用相同的 map/flatMap/fold 样式。它允许您使用不适合 JVM 内存的大型集合(通过跨 JVM 扩展它们),因此您可以@tailrec def calculate通过使用 RDD 而不是List. 它还将为您提供处理内部故障的工具doSomething

PS 所以这就是为什么我不喜欢使用流媒体库来完成这些任务的原因。流式传输更像是来自某些外部系统(如 HttpRequests)的无限流,而不是预定义(甚至是大)数据的计算。

PS2 如果您需要类似反应(没有阻塞),您可以使用Future(或scalaz.concurrent.Task) +Future.sequence

于 2015-12-29T05:52:26.890 回答