9

我正在尝试创建一个可以使用多个输入流的管道。我需要能够以无特定顺序(例如,不交替)等待一个或另一个输入流,从而使 zip 无用。这里没有任何并行或不确定的事情发生:我在一个流或另一个流上等待。我希望能够编写类似于以下的代码(分别在第一个或第二个输入流上的 whereawaitAawaitBawait ):

do
  _ <- awaitA
  x <- awaitA
  y <- awaitB
  yield (x,y)
  _ <- awaitB
  _ <- awaitB
  y' <- awaitB
  yield (x,y')

我最好的解决方案是让内部单子成为另一个管道,例如

foo :: Sink i1 (ConduitM i2 o m) ()

然后允许

awaitA = await
awaitB = lift await

这主要是有效的。不幸的是,这似乎使得在外导管完全连接之前很难熔合到内导管。我尝试的第一件事是:

fuseInner :: Monad m =>
                Conduit i2' m i2 -> 
                Sink i1 (ConduitM i2 o m) () -> 
                Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)

但这不起作用,至少在x有状态的情况下,因为(x =$=)多次运行,x每次都有效地重新启动。

有没有办法写 fuseInner,而不是闯入导管的内部(看起来会很乱)?有没有更好的方法来处理多个输入流?我只是远远超出了管道的设计目的吗?

谢谢!

4

2 回答 2

3

如果您想组合两个IO生成的流,那么加布里埃尔的评论就是解决方案。

否则,您不能等待两个流,哪个流首先产生一个值。导管是单线程和确定性的 - 它一次只处理一个管道。但是您可以创建一个函数来交错两个流,让它们决定何时切换:

{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
import Control.Monad (liftM)
import Data.Conduit.Internal (
    Pipe (..), Source, Sink,
    injectLeftovers, ConduitM (..),
    mapOutput, mapOutputMaybe
  )

-- | Alternate two given sources, running one until it yields `Nothing`,
-- then switching to the other one.
merge :: Monad m
      => Source m (Maybe a)
      -> Source m (Maybe b)
      -> Source m (Either a b)
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r
  where
    goL :: Monad m => Pipe () () (Maybe a) () m () 
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goL (Leftover l ()) r           = goL l r
    goL (NeedInput _ c) r           = goL (c ()) r
    goL (PipeM mx) r                = PipeM $ liftM (`goL` r) mx
    goL (Done _) r                  = mapOutputMaybe (liftM Right) r
    goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o)
    goL (HaveOutput c f Nothing) r  = goR c r
    -- This is just a mirror copy of goL. We should combine them together to
    -- avoid code repetition.
    goR :: Monad m => Pipe () () (Maybe a) () m ()
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goR l (Leftover r ())           = goR l r
    goR l (NeedInput _ c)           = goR l (c ())
    goR l (PipeM mx)                = PipeM $ liftM (goR l) mx
    goR l (Done _)                  = mapOutputMaybe (liftM Left) l
    goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o)
    goR l (HaveOutput c f Nothing)  = goL l c

它处理一个源直到它返回Nothing,然后切换到另一个源,等等。如果一个源完成,另一个源被处理到最后。

例如,我们可以组合和交错两个列表:

import Control.Monad.Trans
import Data.Conduit (($$), awaitForever)
import Data.Conduit.List (sourceList)

main =  (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [  1..10])
               (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110]) )
         $$ awaitForever (\x -> lift $ print x)

如果您需要多个来源,merge可以适应类似的东西

mergeList :: Monad m => [Source m (Maybe a)] -> Source m a

这将在给定的源列表中循环,直到它们全部完成。

于 2013-03-24T09:45:09.533 回答
3

可以通过潜入导管内部来完成。我想避免这种情况,因为它看起来非常混乱。根据这里的回复,听起来没有办法解决它(但我真的很感激更清洁的解决方案)。

关键难点在于它(x =$=)是一个纯函数,但要transPipe给出正确答案,它需要一种有状态的、类似函数的东西:

data StatefulMorph m n = StatefulMorph
    { stepStatefulMorph :: forall a. m a -> n (StatefulMorph m n, a)
    , finalizeStatefulMorph :: n () }

步进StatefulMorph m n接受一个值 inm并返回 in n,该值和下一个StatefulMorph应该用于转换下一个m值。最后一个StatefulMorph应该最终确定(在“有状态(x =$=)”的情况下,最终确定x管道。

Conduit fusion 可以实现为StatefulMorph, 使用代码pipeL稍作改动。签名是:

fuseStateful :: Monad m
             => Conduit a m b
             -> StatefulMorph (ConduitM b c m) (ConduitM a c m)

我还需要替换使用值而不是函数的transPipe(的特殊情况)。hoistStatefulMorph

class StatefulHoist t where
    statefulHoist :: (Monad m, Monad n)
                  => StatefulMorph m n
                  -> t m r -> t n r

可以使用 for 的代码编写一个 for 的StatefulHoist实例,并进行一些小的更改。ConduitM i otransPipe

fuseInner然后很容易实现。

fuseInner :: Monad m
          => Conduit a m b
          -> ConduitM i o (ConduitM b c m) r
          -> ConduitM i o (ConduitM a c m) r
fuseInner left = statefulHoist (fuseStateful left)

我在这里写了更详细的解释,并这里发布了完整的代码。如果有人可以提出更清洁的解决方案,或者使用管道公共 API 的解决方案,请发布。

感谢所有的建议和意见!

于 2013-03-25T01:33:18.980 回答