我想按照 ArrowChoice 的方式做一些事情,但要使用管道。我想等待一个 Either 值,然后将 Left 值传递给一个管道,将 Right 值传递给另一个,然后将结果合并回 Either 流。


newtype AutomataM i m o = Automata (i -> m (o, Automata i o))

conduitStep :: Conduit i m o -> AutomataM i m [o]

输出列表的原因是导管可能为每个输入产生 0 个或多个输出。

我查看了 ResumableConduit 及其亲属,大概答案就在某处。但我不太明白它是如何完成的。


import Data.Conduit
import Data.Conduit.Internal (Pipe (..), ConduitM (..))

newtype Automata i o m r = Automata (m ([o], Either r (i -> Automata i o m r)))

conduitStep :: Monad m => ConduitM i o m r -> Automata i o m r
conduitStep (ConduitM con0) =
    Automata $ go [] id con0
    go _ front (Done r) = return (front [], Left r)
    go ls front (HaveOutput p _ o) = go ls (front . (o:)) p
    go ls front (NeedInput p _) =
        case ls of
            [] -> return (front [], Right $ conduitStep . ConduitM . p)
            l:ls' -> go ls' front (p l)
    go ls front (PipeM mp) = mp >>= go ls front
    go ls front (Leftover p l) = go (l:ls) front p


  1. 通过将输出保留为列表,它不是恒定的内存。
  2. 我们正在丢弃终结器。


编辑我最终ZipConduit在导管额外 0.1.5 中实施。这是一个使用它的演示,听起来有点像你的情况:

import           Control.Applicative
import           Data.Conduit
import           Data.Conduit.Extra
import qualified Data.Conduit.List   as CL

conduit1 :: Monad m => Conduit Int m String
conduit1 = CL.map $ \i -> "conduit1: " ++ show i

conduit2 :: Monad m => Conduit Double m String
conduit2 = CL.map $ \d -> "conduit2: " ++ show d

conduit :: Monad m => Conduit (Either Int Double) m String
conduit = getZipConduit $
    ZipConduit (lefts =$= conduit1) *>
    ZipConduit (rights =$= conduit2)
    lefts = CL.mapMaybe (either Just (const Nothing))
    rights = CL.mapMaybe (either (const Nothing) Just)

main :: IO ()
main = do
    let src = do
            yield $ Left 1
            yield $ Right 2
            yield $ Left 3
            yield $ Right 4
        sink = CL.mapM_ putStrLn
    src $$ conduit =$ sink
pipes有一种使用“推送类别”管道的民间方法。完整的实现来自这个邮件列表帖子这个 Stack Overflow 答案。我认为它还没有发布,原因是为了简化Pipes接口,专注于使用通过这个方法隐藏的“排序”monad 实例,并且还没有证据证明这个实现真正正确地实现了 Arrow 类。

这个想法是实现一个Edge新类型(如下所示),它是一个基于推送的管道,其类型参数以正确的顺序用于CategoryArrowArrowChoice两者Functor以及Applicative它们的输出值。这使您可以使用箭头符号将它们组合成有向无环图。我将在下面运行实现,但忽略它并使用Arrow/ ArrowChoice/Applicative实例而Edge不用过多关注它是安全的。


{-# LANGUAGE RankNTypes           #-}
{-# LANGUAGE TypeSynonymInstances #-}

import Prelude hiding ((.), id)
import Pipes.Core
import Pipes.Lift
import Control.Monad.Morph
import Control.Category
import Control.Monad.State.Strict
import Control.Arrow

这是一种非典型的管道使用模式,并没有暴露在Pipes模块中;您必须导入Pipes.Core才能使用push. 基于推的管道看起来像

-- push :: a -> Proxy a' a a' a m r

Proxy因此,在允许运行之前,它们需要至少一个上游值。这意味着需要通过将第一个值作为函数调用传递来“启动”整个过程,并且最左边的 push-Proxy将控制整个流。


newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

例如Category,我们使用具有pushasid(<~<)as 组合的“push”类别:

instance Monad m => Category (Edge m r) where
  id = Edge push
  Edge a . Edge b = Edge (a <~< b)

我们通过在向下边缘增加(ie )将函数嵌入到Edgewith中。为此,我们使用了有规律的类别,但是却把我们塞进了这个过程中。arridpushrespondp />/ respond == pf

instance Monad m => Arrow (Edge m r) where
  arr f = Edge (push />/ respond . f)


  first (Edge p) = Edge $ \(b, d) ->
    evalStateP d $ (up \>\ hoist lift . p />/ dn) b
      up () = do
        (b, d) <- request ()
        lift (put d)
        return b
      dn c = do
        d <- lift get
        respond (c, d)


instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)


type PProducer m r b =            Edge m r () b
type PConsumer m r a = forall b . Edge m r a  b

然后我们Functor将为. 这是对底层的分析,所以有点冗长。然而,本质上,所发生的只是我们将.ApplicativePProducercasePipefyieldPipe

instance Functor (PProducer m r) where
  fmap f (Edge k) = $ Edge $ \() -> go (k ()) where
    go p = case p of
      Request () ku -> Request ()    (\() -> go (ku ()))
      -- This is the only interesting line
      Respond b  ku -> Respond (f b) (\() -> go (ku ()))
      M          m  -> M (m >>= \p' -> return (go p'))  
      Pure    r     -> Pure r


instance (Monad m) => Applicative (Edge m r ()) where
    pure b = Edge $ \() -> forever $ respond b
    (Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
        goL p1 p2 = case p1 of
            Request () ku -> Request () (\() -> goL   (ku ()) p2)
            Respond f  ku ->                    goR f (ku ()) p2
            M          m  -> M (m >>= \p1' -> return (goL p1' p2))
            Pure    r     -> Pure r
        goR f p1 p2 = case p2 of
            Request () ku -> Request ()    (\() -> goR f p1 (ku ()))
            Respond x  ku -> Respond (f x) (\() -> goL   p1 (ku ()))
            M          m  -> M (m >>= \p2' -> return (goR f p1 p2'))
            Pure    r     -> Pure r
