11

可以使用流(惰性列表)从多个(为简单起见说两个)源中按需提取。Iteratees 可用于处理来自单一来源的数据。

是否有类似 Iteratee 的功能概念来处理多个输入源?我可以想象一个 Iteratee 的状态信号,它想要从哪个源中提取。

4

4 回答 4

15

要使用管道来做到这一点,您将 Pipe monad 转换器嵌套在其自身中,对于您希望与之交互的每个生产者一次。例如:

import Control.Monad
import Control.Monad.Trans
import Control.Pipe

producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]

consumes2 :: (Show a, Show b) =>
    Consumer a (Consumer b IO) r
consumes2 = forever $ do
    a <- await       -- await from outer producer
    b <- lift await  -- await from inner producer
    lift $ lift $ print (a, b)

就像多个变量的 Haskell 柯里化函数一样,您可以使用组合和 runPipe 将其部分应用于每个源:

consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA

fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB

上述函数运行时输出:

>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)

这个技巧适用于让或等待上游或下游任意数量的管道。它也适用于代理,即管道的双向类似物。

编辑:请注意,这也适用于任何 iteratee 库,而不仅仅是pipes. 事实上,John Milikin 和 Oleg 是这种方法的最初倡导者,我只是从他们那里窃取了这个想法。

于 2012-09-19T15:12:01.043 回答
6

我们在 Scala 中使用Machines来获取不只是两个,而是任意数量的源。

库本身在Tee模块上提供了二元连接的两个示例:mergeOuterJoinhashJoin. 下面是代码的hashJoin样子(假设两个流都已排序):

/**
 * A natural hash join according to keys of type `K`.
 */
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
  def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
    a  <- awaits(left[A])
    mp <- build(m + (f(a) -> a))
  } yield mp) orElse Return(m)
  for {
    m <- build(Map())
    r <- (awaits(right[B]) flatMap (b => {
      val k = g(b)
      if (m contains k) emit(m(k) -> b) else Return(())
    })) repeatedly
  } yield r
}

这段代码使用方法构建了一个Plan“编译”为 aMachine的代码repeatedly。这里构建的类型是Tee[A, B, (A, B)]具有两个输入的机器。awaits(left)用和请求左右输入,用awaits(right)输出emit

还有一个Haskell 版本的 Machines

于 2012-09-19T17:55:58.387 回答
3

管道(并且,它可以为管道构建,但该代码尚未发布)有一个zip原语,它接受两个上游并将它们组合为一个元组流。

于 2012-09-19T14:52:38.130 回答
1

查看管道库垂直连接可能会满足您的要求。例如,

import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void

source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"

sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x

pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)

排序运算符(>>)垂直连接源,产生输出(在 a 上runPipe

's'
'a'
'y'
'w'
'h'
'a'
't'
于 2012-09-19T14:41:11.627 回答