使用 scalaz-stream 是否可以拆分/分叉然后重新加入流?
例如,假设我有以下功能
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)
zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )
使用 scalaz-stream,在此示例中,结果将如您所料 - 一个从 1 到 10 的数字元组传递到接收器。
但是,如果我们替换streamOfNumbers
为需要 IO 的东西,它实际上会执行两次 IO 操作。
使用 aTopic
我可以创建一个 pub/sub 进程,它可以正确地复制流中的元素,但它不会缓冲 - 它只是尽可能快地使用整个源,而不管接收器消耗它的速度如何。
我可以将它包装在一个有界队列中,但是最终结果感觉比它需要的要复杂得多。
有没有一种更简单的方法可以在 scalaz-stream 中拆分流而无需从源中重复 IO 操作?