9

几天前的一个观点,我问了这个问题。现在我需要这个函数的纯单线程版本:

重复一遍,我需要一个函数将每个接收到的值发送到每个接收器并收集它们的结果。函数的类型签名应该是这样的:

broadcast :: [Sink a m b] -> Sink a m [b]

最佳斯文


PS不是sequence,我试过了:

> C.sourceList [1..100] $$ sequence [C.fold (+) 0, C.fold (+) 0]
[5050, 0]

预期结果:

[5050, 5050]

PPSzipSinks给出了想要的结果,但它只适用于元组:

> C.sourceList [1..100] $$ C.zipSinks (C.fold (+) 0) (C.fold (+) 0)
(5050, 5050)
4

1 回答 1

10

基本上我们需要做的就是重新实现sequence,但是用zipSinks而不是原来的排序操作:

import Data.Conduit as C
import Data.Conduit.List as C
import Data.Conduit.Util as C

fromPairs
    :: (Functor f)
    => f [a]                        -- ^ an empty list to start with
    -> (f a -> f [a] -> f (a, [a])) -- ^ a combining function
    -> [f a]                        -- ^ input list
    -> f [a]                        -- ^ combined list
fromPairs empty comb = g
  where
    g []     = empty
    g (x:xs) = uncurry (:) `fmap` (x `comb` g xs)

现在创建broadcast只是适用fromPairszipSinks

broadcast :: (Monad m) => [Sink a m b] -> Sink a m [b]
broadcast = fromPairs (return []) zipSinks

我们可以做类似的事情

main = C.sourceList [1..100] $$ broadcast [C.fold (+) 0, C.fold (*) 1]

更新:我们可以看到这fromPairs看起来很公平 sequenceA,因此我们可以进一步推动这个想法。让我们在管道上定义一个 zipping applicative functor,类似于ZipList

import Control.Applicative
import Control.Monad
import Data.Conduit
import Data.Conduit.Util
import Data.Traversable (Traversable(..), sequenceA)

newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }

instance Monad m => Functor (ZipSink i m) where
    fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
    pure  = ZipSink . return
    (ZipSink f) <*> (ZipSink x) =
         ZipSink $ liftM (uncurry ($)) $ zipSinks f x

然后broadcast变得很简单

broadcast :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
broadcast = getZipSink . sequenceA . fmap ZipSink
于 2013-08-15T13:21:09.607 回答