8

使用 scalaz-stream 是否可以拆分/分叉然后重新加入流?

例如,假设我有以下功能

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

使用 scalaz-stream,在此示例中,结果将如您所料 - 一个从 1 到 10 的数字元组传递到接收器。

但是,如果我们替换streamOfNumbers为需要 IO 的东西,它实际上会执行两次 IO 操作。

使用 aTopic我可以创建一个 pub/sub 进程,它可以正确地复制流中的元素,但它不会缓冲 - 它只是尽可能快地使用整个源,而不管接收器消耗它的速度如何。

我可以将它包装在一个有界队列中,但是最终结果感觉比它需要的要复杂得多。

有没有一种更简单的方法可以在 scalaz-stream 中拆分流而无需从源中重复 IO 操作?

4

3 回答 3

6

还要用“拆分”要求来澄清先前的答案 delas。您的特定问题的解决方案可能不需要拆分流:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)
于 2014-12-18T08:25:10.060 回答
2

您也许仍然可以使用主题,并确保在您推送主题之前子进程将订阅。

但是请注意,此解决方案没有任何限制,即如果您推得太快,您可能会遇到 OOM 错误。

def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = {
  val topic = async.topic[A]

  val sub1 = topic.subscribe
  val sub2 = topic.subscribe

  merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain))
}
于 2014-12-18T08:16:13.880 回答
0

我同样需要这个功能。我的情况有点棘手,不允许我以这种方式解决它。

感谢 Daniel Spiewak 在这个线程中的回复,我能够得到以下工作。我通过添加改进了他的解决方案,onHalt以便我的应用程序在Process完成后退出。

def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = {
  val left = async.boundedQueue[A](limit)
  val right = async.boundedQueue[A](limit)

  val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause =>
    Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) }
  }
  val dequeue = Process((left.dequeue, right.dequeue))

  enqueue merge dequeue
}
于 2016-09-02T21:53:23.267 回答