4

我很难pipe用这个签名写一个:

toOneBigList :: (Monad m, Proxy p) => () -> Pipe p a [a] m r

它应该简单地a从上游获取所有 s 并将它们发送到下游列表中。

我所有的尝试看起来都从根本上失败了。

谁能指出我正确的方向?

4

2 回答 2

9

有两种pipes基于解决方案,我会让您选择您喜欢的一种。

注意:不清楚为什么在下游接口上输出列表而不是直接返回结果。

导管式

第一个,非常接近conduit基于 -based 的解决方案,使用即将推出的pipes-pase,它基本上是完整的,只需要文档。你可以在 Github 上找到最新的草稿。

使用pipes-parse,解决方案与conduitPetr 给出的解决方案相同:

import Control.Proxy
import Control.Proxy.Parse

combine
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP [Maybe a] p) (Maybe a) [a] m ()
combine () = loop []
  where
    loop as = do
        ma <- draw
        case ma of
            Nothing -> respond (reverse as)
            Just a  -> loop (a:as)

draw就像conduit's :如果缓冲区为空, await它会从剩余缓冲区(即部分)或上游请求一个值。表示文件结束。StatePNothing

wrap您可以使用函数 from包装没有文件结束信号的管道,该函数pipes-parse具有以下类型:

wrap :: (Monad m, Proxy p) => p a' a b' b m r -> p a' a b' (Maybe b) m s

经典管道风格

第二种选择更简单一些。如果要折叠给定的管道,可以直接使用WriterP

import Control.Proxy
import Control.Proxy.Trans.Writer

foldIt
  :: (Monad m, Proxy p) =>
     (() -> Pipe p a b m ()) -> () -> Pipe p a [b] m ()
foldIt p () = runIdentityP $ do
    r <- execWriterK (liftP . p >-> toListD >-> unitU) ()
    respond r

这是对正在发生的事情的更高级别的描述,但它需要将管道作为显式参数传递。这取决于你喜欢哪一个。

顺便说一句,这就是我问你为什么要向下游发送单个值的原因。如果您返回折叠列表,则上述内容要简单得多:

foldIt p = execWriterK (liftP . p >-> toListD)

如果它的代理类型是完全多态的,甚至liftP可能不需要。p我只是把它作为预防措施。

奖金解决方案

pipes-parse没有提供的原因toOneBigList是,将结果分组到列表中始终是一种管道反模式。 pipes有几个不错的功能,即使您尝试生成多个列表,也不必将输入分组到列表中。例如,使用respond组合,您可以让代理产生它要遍历的流的子集,然后注入使用该子集的处理程序:

example :: (Monad m, Proxy p) => () -> Pipe p a (() -> Pipe p a a m ()) m r
example () = runIdentityP $ forever $ do
    respond $ \() -> runIdentityP $ replicateM_ 3 $ request () >>= respond

printIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
printIt () = runIdentityP $ do
    lift $ putStrLn "Here we go!"
    printD ()

useIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
useIt = example />/ (\p -> (p >-> printIt) ())

以下是如何使用它的示例:

>>> runProxy $ enumFromToS 1 10 >-> useIt
Here we go!
1
2
3
Here we go!
4
5
6
Here we go!
7
8
9
Here we go!
10

这意味着即使您需要对元素进行分组,也永远不需要将单个元素放入内存。

于 2013-05-28T14:29:52.637 回答
2

我只会给出部分答案,也许其他人会有更好的答案。

据我所知,标准管道没有检测管道另一部分何时终止的机制。终止的第一个管道产生管道的最终结果,所有其他管道都被丢弃。因此,如果您有一个永远消耗输入的管道(最终生成一个列表),那么它在上游完成时将没有机会采取行动并产生输出。(这是有意的,因此上游和下游部分彼此是双重的。)也许这在管道顶部的一些图书馆建筑中得到了解决。

情况与管道不同。它具有消费功能,可将所有输入组合到一个列表中并返回(而不是输出)它。编写一个你需要的函数,在最后输出列表,并不难:

import Data.Conduit

combine :: (Monad m) => Conduit a m [a]
combine = loop []
  where
    loop xs = await >>= maybe (yield $ reverse xs) (loop . (: xs))
于 2013-05-28T12:50:35.777 回答