4

我想按照 ArrowChoice 的方式做一些事情,但要使用管道。我想等待一个 Either 值,然后将 Left 值传递给一个管道,将 Right 值传递给另一个,然后将结果合并回 Either 流。

大概这可以通过使内部管道像自动机一样来完成:将管道变成一个函数,该函数接受一个参数并返回一个产生的输出的单子列表:

newtype AutomataM i m o = Automata (i -> m (o, Automata i o))

conduitStep :: Conduit i m o -> AutomataM i m [o]

输出列表的原因是导管可能为每个输入产生 0 个或多个输出。

我查看了 ResumableConduit 及其亲属,大概答案就在某处。但我不太明白它是如何完成的。

4

2 回答 2

3

它与您提供的类型签名不完全相同,但是:

import Data.Conduit
import Data.Conduit.Internal (Pipe (..), ConduitM (..))

newtype Automata i o m r = Automata (m ([o], Either r (i -> Automata i o m r)))

conduitStep :: Monad m => ConduitM i o m r -> Automata i o m r
conduitStep (ConduitM con0) =
    Automata $ go [] id con0
  where
    go _ front (Done r) = return (front [], Left r)
    go ls front (HaveOutput p _ o) = go ls (front . (o:)) p
    go ls front (NeedInput p _) =
        case ls of
            [] -> return (front [], Right $ conduitStep . ConduitM . p)
            l:ls' -> go ls' front (p l)
    go ls front (PipeM mp) = mp >>= go ls front
    go ls front (Leftover p l) = go (l:ls) front p

但请注意这种方法:

  1. 通过将输出保留为列表,它不是恒定的内存。
  2. 我们正在丢弃终结器。

可能有一种方法可以提供ZipConduit类似于ZipSourceand的抽象,ZipSink可以更优雅地处理此类问题,但我没有考虑太多。


编辑我最终ZipConduit在导管额外 0.1.5 中实施。这是一个使用它的演示,听起来有点像你的情况:

import           Control.Applicative
import           Data.Conduit
import           Data.Conduit.Extra
import qualified Data.Conduit.List   as CL

conduit1 :: Monad m => Conduit Int m String
conduit1 = CL.map $ \i -> "conduit1: " ++ show i

conduit2 :: Monad m => Conduit Double m String
conduit2 = CL.map $ \d -> "conduit2: " ++ show d

conduit :: Monad m => Conduit (Either Int Double) m String
conduit = getZipConduit $
    ZipConduit (lefts =$= conduit1) *>
    ZipConduit (rights =$= conduit2)
  where
    lefts = CL.mapMaybe (either Just (const Nothing))
    rights = CL.mapMaybe (either (const Nothing) Just)

main :: IO ()
main = do
    let src = do
            yield $ Left 1
            yield $ Right 2
            yield $ Left 3
            yield $ Right 4
        sink = CL.mapM_ putStrLn
    src $$ conduit =$ sink
于 2014-02-10T17:40:59.007 回答
1

pipes有一种使用“推送类别”管道的民间方法。完整的实现来自这个邮件列表帖子这个 Stack Overflow 答案。我认为它还没有发布,原因是为了简化Pipes接口,专注于使用通过这个方法隐藏的“排序”monad 实例,并且还没有证据证明这个实现真正正确地实现了 Arrow 类。

这个想法是实现一个Edge新类型(如下所示),它是一个基于推送的管道,其类型参数以正确的顺序用于CategoryArrowArrowChoice两者Functor以及Applicative它们的输出值。这使您可以使用箭头符号将它们组合成有向无环图。我将在下面运行实现,但忽略它并使用Arrow/ ArrowChoice/Applicative实例而Edge不用过多关注它是安全的。

编辑:此代码最好在https://github.com/Gabriel439/Haskell-RCPL-Library上提供)


{-# LANGUAGE RankNTypes           #-}
{-# LANGUAGE TypeSynonymInstances #-}

import Prelude hiding ((.), id)
import Pipes.Core
import Pipes.Lift
import Control.Monad.Morph
import Control.Category
import Control.Monad.State.Strict
import Control.Arrow

这是一种非典型的管道使用模式,并没有暴露在Pipes模块中;您必须导入Pipes.Core才能使用push. 基于推的管道看起来像

-- push :: a -> Proxy a' a a' a m r

Proxy因此,在允许运行之前,它们需要至少一个上游值。这意味着需要通过将第一个值作为函数调用传递来“启动”整个过程,并且最左边的 push-Proxy将控制整个流。

给定一个基于推送的管道,我们可以实现Category和。标准解决方案还涉及类型类,以便我们以正确的顺序获得类型参数和ArrowArrowChoiceEdgeCategoryArrow

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

例如Category,我们使用具有pushasid(<~<)as 组合的“push”类别:

instance Monad m => Category (Edge m r) where
  id = Edge push
  Edge a . Edge b = Edge (a <~< b)

我们通过在向下边缘增加(ie )将函数嵌入到Edgewith中。为此,我们使用了有规律的类别,但是却把我们塞进了这个过程中。arridpushrespondp />/ respond == pf

instance Monad m => Arrow (Edge m r) where
  arr f = Edge (push />/ respond . f)

我们还使用本地状态转换器来存储snd我们对的一半并将其“绕过”输入管道first

  first (Edge p) = Edge $ \(b, d) ->
    evalStateP d $ (up \>\ hoist lift . p />/ dn) b
    where
      up () = do
        (b, d) <- request ()
        lift (put d)
        return b
      dn c = do
        d <- lift get
        respond (c, d)

最后,我们ArrowChoice通过实现来获得一个实例left。为此,我们使用返回或管道来分担传递值Left的负担。Right

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

我们可以使用Edge创建“基于推送”的生产者和消费者

type PProducer m r b =            Edge m r () b
type PConsumer m r a = forall b . Edge m r a  b

然后我们Functor将为. 这是对底层的分析,所以有点冗长。然而,本质上,所发生的只是我们将.ApplicativePProducercasePipefyieldPipe

instance Functor (PProducer m r) where
  fmap f (Edge k) = $ Edge $ \() -> go (k ()) where
    go p = case p of
      Request () ku -> Request ()    (\() -> go (ku ()))
      -- This is the only interesting line
      Respond b  ku -> Respond (f b) (\() -> go (ku ()))
      M          m  -> M (m >>= \p' -> return (go p'))  
      Pure    r     -> Pure r

最后,Applicative除了我们必须在运行上游管道以产生函数和运行下游管道以产生参数之间切换之外,几乎相同。

instance (Monad m) => Applicative (Edge m r ()) where
    pure b = Edge $ \() -> forever $ respond b
    (Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
      where
        goL p1 p2 = case p1 of
            Request () ku -> Request () (\() -> goL   (ku ()) p2)
            Respond f  ku ->                    goR f (ku ()) p2
            M          m  -> M (m >>= \p1' -> return (goL p1' p2))
            Pure    r     -> Pure r
        goR f p1 p2 = case p2 of
            Request () ku -> Request ()    (\() -> goR f p1 (ku ()))
            Respond x  ku -> Respond (f x) (\() -> goL   p1 (ku ()))
            M          m  -> M (m >>= \p2' -> return (goR f p1 p2'))
            Pure    r     -> Pure r
于 2014-02-10T21:14:29.577 回答