4

这个问题有点 codegolf 和很多新手。

pipes在 Haskell 中使用了很棒的库,我想拆分管道以沿多个通道发送相同的数据(进行广播)。本Pipes.Concurrent教程建议使用spawn来创建邮箱,利用Output's monoid 状态。例如,我们可能会做这样的事情:

main = do
 (output1, input1) <- spawn Unbounded
 (output2, input2) <- spawn Unbounded
 let effect1 = fromInput input1 >-> pipe1
 let effect2 = fromInput input2 >-> pipe2
 let effect3 = P.stdinLn >-> toOutput (output1 <> output2)
 ...

这种间接通过邮箱真的有必要吗?我们可以写这样的东西吗?

main = do
 let effect3 = P.stdinLn >-> (pipe1 <> pipe2)
 ...

上面没有编译,因为Pipe没有Monoid实例。这有充分的理由吗?第一种方法真的是分割管道最干净的方法吗?

4

1 回答 1

8

有两种方法可以在不使用并发的情况下做到这一点,但都需要注意。

第一种方法是 ifpipe1pipe2are just simple Consumers 永远循环,如:

p1 = for cat f  -- i.e. p1 = forever $ await >>= f
p2 = for cat g  -- i.e. p2 = forever $ await >>= g

...那么解决这个问题的简单方法就是写:

for P.stdinLn $ \str -> do
    f str
    g str

例如,如果p1只是printing 每个值:

p1 = for cat (lift . print)

...并且p2只是将该值写入句柄:

p2 = for cat (lift . hPutStrLn h)

...然后你会像这样组合它们:

for P.stdinLn $ \str -> do
    lift $ print str
    lift $ hPutStrLn h str

但是,这种简化仅适用于Consumer简单循环的 s。还有另一种更通用的解决方案,即为ArrowChoice管道定义一个实例。我相信 pull-basedPipe不允许正确的守法实例,但 push-based 允许Pipe

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

instance (Monad m) => Category (Edge m r) where
    id = Edge push
    (Edge p2) . (Edge p1) = Edge (p1 >~> p2)

instance (Monad m) => Arrow (Edge m r) where
    arr f = Edge (push />/ respond . f)
    first (Edge p) = Edge $ \(b, d) ->
        evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b
      where
        up () = do
            (b, d) <- request ()
            lift $ put d
            return b
        dn c = do
            d <- lift get
            respond (c, d)

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

这需要一个新类型,以便类型参数符合ArrowChoice预期的顺序。

如果你不熟悉 push-based 这个词Pipe,它基本上是Pipe从最上游的管道而不是最下游的管道开始的,它们都具有以下形状:

a -> Pipe a b m r

将其视为Pipe在从上游接收到至少一个值之前无法“运行”。

这些 push-based Pipes 是传统 pull-based s 的“双重” Pipe,具有自己的组合运算符和身份:

(>~>) :: (Monad m)
      => (a -> Pipe a b m r)
      -> (b -> Pipe b c m r)
      -> (a -> Pipe a c m r)

push  :: (Monad m)
      ->  a -> Pipe a a m r

...但默认情况下单向PipesAPI 不会导出它。您只能从中获取这些运算符Pipes.Core(并且您可能希望更仔细地研究该模块以建立对它们如何工作的直觉)。该模块显示 push-based Pipes 和 pull-based Pipes 都是更通用的双向版本的特殊情况,理解双向情况是您了解它们为什么彼此对偶的方式。

一旦有了Arrow基于推送的管道的实例,就可以编写如下内容:

p >>> bifurcate >>> (p1 +++ p2)
  where
    bifurcate = Edge $ pull ~> \a -> do
        yield (Left  a)  -- First give `p1` the value
        yield (Right a)  -- Then give `p2` the value

然后,您可以在runEdge完成后将其转换为基于拉动的管道。

这种方法有一个主要缺点,即您不能自动将基于拉的管道升级为基于推送的管道(但通常不难弄清楚如何手动进行)。例如,要升级Pipes.Prelude.map为基于推送的Pipe,您可以编写:

mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r)
mapPush f a = do
    yield (f a)
    Pipes.Prelude.map f

然后它有正确的类型被包裹在Arrow

mapEdge :: (Monad m) => (a -> b) -> Edge m r a b
mapEdge f = Edge (mapPush f)

当然,更简单的方法是从头开始编写:

mapEdge f = Edge $ push ~> yield . f

使用最适合您的方法。

事实上,我提出ArrowandArrowChoice实例正是因为我试图回答与您完全相同的问题:您如何在不使用并发的情况下解决这些问题?我在此处的另一个 Stack Overflow 答案中写了一个关于这个更一般主题的长答案,我在其中描述了如何使用这些ArrowArrowChoice实例将并发系统提炼成等效的纯系统。

于 2013-11-04T00:14:44.703 回答