可以使用流(惰性列表)从多个(为简单起见说两个)源中按需提取。Iteratees 可用于处理来自单一来源的数据。
是否有类似 Iteratee 的功能概念来处理多个输入源?我可以想象一个 Iteratee 的状态信号,它想要从哪个源中提取。
可以使用流(惰性列表)从多个(为简单起见说两个)源中按需提取。Iteratees 可用于处理来自单一来源的数据。
是否有类似 Iteratee 的功能概念来处理多个输入源?我可以想象一个 Iteratee 的状态信号,它想要从哪个源中提取。
要使用管道来做到这一点,您将 Pipe monad 转换器嵌套在其自身中,对于您希望与之交互的每个生产者一次。例如:
import Control.Monad
import Control.Monad.Trans
import Control.Pipe
producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]
consumes2 :: (Show a, Show b) =>
Consumer a (Consumer b IO) r
consumes2 = forever $ do
a <- await -- await from outer producer
b <- lift await -- await from inner producer
lift $ lift $ print (a, b)
就像多个变量的 Haskell 柯里化函数一样,您可以使用组合和 runPipe 将其部分应用于每个源:
consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA
fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB
上述函数运行时输出:
>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)
这个技巧适用于让或等待上游或下游任意数量的管道。它也适用于代理,即管道的双向类似物。
编辑:请注意,这也适用于任何 iteratee 库,而不仅仅是pipes
. 事实上,John Milikin 和 Oleg 是这种方法的最初倡导者,我只是从他们那里窃取了这个想法。
我们在 Scala 中使用Machines来获取不只是两个,而是任意数量的源。
库本身在Tee
模块上提供了二元连接的两个示例:mergeOuterJoin
和hashJoin
. 下面是代码的hashJoin
样子(假设两个流都已排序):
/**
* A natural hash join according to keys of type `K`.
*/
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
a <- awaits(left[A])
mp <- build(m + (f(a) -> a))
} yield mp) orElse Return(m)
for {
m <- build(Map())
r <- (awaits(right[B]) flatMap (b => {
val k = g(b)
if (m contains k) emit(m(k) -> b) else Return(())
})) repeatedly
} yield r
}
这段代码使用方法构建了一个Plan
“编译”为 aMachine
的代码repeatedly
。这里构建的类型是Tee[A, B, (A, B)]
具有两个输入的机器。awaits(left)
用和请求左右输入,用awaits(right)
输出emit
。
还有一个Haskell 版本的 Machines。
管道(并且,它可以为管道构建,但该代码尚未发布)有一个zip
原语,它接受两个上游并将它们组合为一个元组流。
import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void
source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"
sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x
pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)
排序运算符(>>)
垂直连接源,产生输出(在 a 上runPipe
)
's'
'a'
'y'
'w'
'h'
'a'
't'