14

假设我有简单的生产者/消费者模型,消费者希望将一些状态传回给生产者。例如,让下游流动的对象是我们想要写入文件的对象,上游对象是一些表示对象在文件中写入位置的标记(例如偏移量)。

这两个过程可能看起来像这样(带pipes-4.0),

{-# LANGUAGE GeneralizedNewtypeDeriving #-}

import Pipes
import Pipes.Core
import Control.Monad.Trans.State       
import Control.Monad

newtype Object = Obj Int
               deriving (Show)

newtype ObjectId = ObjId Int
                 deriving (Show, Num)

writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
  where go = do i <- get
                obj <- lift $ request i
                lift $ lift $ putStrLn $ "Wrote "++show obj
                modify (+1)

produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
  where go [] = return ()
        go (obj:rest) = do
            lift $ putStrLn $ "Producing "++show obj
            objId <- respond obj
            lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
            go rest

objects = [ Obj i | i <- [0..10] ]

尽管这可能很简单,但我在推理如何组合它们时遇到了相当大的困难。理想情况下,我们需要一个基于推送的控制流,如下所示,

  1. writeObjects从阻塞开始request,发送初始ObjId 0上游。
  2. produceObjects将第一个对象 , 发送到Obj 0下游
  3. writeObjects写入对象并增加其状态,然后等待request,这次发送ObjId 1上游
  4. respond作为produceObjects回报ObjId 0
  5. produceObjects对第二个对象在步骤 (2) 继续,Obj 1

我最初的尝试是使用基于推送的组合,如下所示,

main = void $ run $ produceObjects objects >>~ const writeObjects

请注意使用const来解决其他不兼容的类型(这可能是问题所在)。然而,在这种情况下,我们发现它ObjId 0被吃掉了,

Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...

基于拉动的方法,

main = void $ run $ const (produceObjects objects) +>> writeObjects

遇到类似的问题,这次 drop Obj 0

一个人如何以所需的方式创作这些作品?

4

3 回答 3

14

选择使用哪种组合取决于哪​​个组件应该启动整个过程。如果您希望下游管道启动该过程,那么您希望使用基于拉的组合(即(>+>)/ (+>>)),但如果您希望上游管道启动该过程,那么您应该使用基于推送的组合(即(>>~)/ (>~>))。您收到的类型错误实际上是在警告您代码中存在逻辑错误:您尚未明确确定哪个组件首先启动该过程。

从您的描述中,很明显您希望控制流开始,produceObjects因此您希望使用基于推送的组合。一旦你使用了基于推送的组合,组合操作符的类型将告诉你关于如何修复你的代码所需要知道的一切。我将采用它的类型并将其专门用于您的组合链:

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types
(>>~) :: Server ObjectId Object IO ()
      -> (Object -> Client ObjectId Object IO ())
      -> Effect IO ()

正如您已经注意到的那样,您尝试使用时遇到的类型错误(>>~)告诉您您缺少函数类型的Object参数writeObjectswriteObjects这静态地强制您在收到您的第一个Object(通过初始参数)之前不能运行任何代码。

解决方案是像这样重写你的writeObjects函数:

writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj0 = evalStateT (go obj0) (ObjId 0)
  where go obj = do i <- get
                    lift $ lift $ putStrLn $ "Wrote "++ show obj
                    modify (+1)
                    obj' <- lift $ request i
                    go obj'

这会给出正确的行为:

>>> run $ produceObjects objects >>~ writeObjects
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 0
Producing Obj 1
Wrote Obj 1
Object Obj 1 has ID ObjId 1
Producing Obj 2
Wrote Obj 2
Object Obj 2 has ID ObjId 2
Producing Obj 3
Wrote Obj 3
Object Obj 3 has ID ObjId 3
Producing Obj 4
Wrote Obj 4
Object Obj 4 has ID ObjId 4
Producing Obj 5
Wrote Obj 5
Object Obj 5 has ID ObjId 5
Producing Obj 6
Wrote Obj 6
Object Obj 6 has ID ObjId 6
Producing Obj 7
Wrote Obj 7
Object Obj 7 has ID ObjId 7
Producing Obj 8
Wrote Obj 8
Object Obj 8 has ID ObjId 8
Producing Obj 9
Wrote Obj 9
Object Obj 9 has ID ObjId 9
Producing Obj 10
Wrote Obj 10
Object Obj 10 has ID ObjId 10

您可能想知道为什么两个管道之一接受初始参数的要求是有意义的,而不是抽象地证明这是范畴法则所要求的。简单的英语解释是,另一种方法是在到达其第一个语句之前,您需要缓冲Object两个管道“之间”的第一个传输。这种方法会产生很多有问题的行为和错误的极端情况,但可能最重要的问题是管道组合将不再是关联的,并且效果的顺序会根据您组合事物的顺序而改变。writeObjectsrequest

双向管道组合运算符的好处是类型可以计算出来,因此您总是可以通过研究类型来推断组件是“主动”(即启动控制)还是“被动”(即等待输入) . 如果组合说某个管道(如writeObjects)必须接受参数,那么它是被动的。如果它不带参数(如produceObjects),那么它是活动的并启动控制。因此,组合迫使您在管道中最多有一个活动管道(不接受初始参数的管道),这就是开始控制的管道。

于 2013-08-27T18:12:28.450 回答
4

'const' 是您删除数据的地方。为了获取所有数据,您可能需要执行以下基于推送的工作流:

writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj = go 0 obj
  where
    go objid obj = do
        lift $ putStrLn $ "Wrote "++show obj
        obj' <- request objid
        go (objid + 1) obj'

-- produceObjects as before

main = void $ run $ produceObjects objects >>~ writeObjects
于 2013-08-27T18:13:31.860 回答
2

我们一直在邮件列表上讨论这个问题,但我想我也会把它扔在这里给那些感兴趣的人。

您的问题是您有两个协同程序,它们都准备好互相吐出值。任何一方都不需要另一方的输入来产生价值。那么谁先走呢?好吧,你自己说的:

writeObjects首先阻塞请求,发送初始ObjId 0上游

好吧,这意味着我们需要延迟produceObjects,以便它在吐出相应的对象之前等待一个ObjId信号(即使它显然不需要所说的 ID)。

深入研究 Proxy 内部,这里是魔法咒语,我现在不会费心去解释。基本的想法是在你需要之前接受输入,然后在需要时应用输入,然后假装你需要一个新的输入(即使你还不需要那个):

delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r
delayD p0 b' = case p0 of
    Request a' f -> Request a' (go . f)
    Respond b  g -> Respond b  (delayD (g b'))
    M m          -> M (liftM go m)
    Pure r       -> Pure r
  where
    go p = delayD p b'

现在,您可以使用它produceObjects objects而不是const,并且您的第二次尝试按需要工作:

delayD (produceObjects objects) +>> writeObjects

我们正在delayD邮件列表中讨论它是否值得包含在标准 Pipes 曲目中。

于 2013-08-27T21:39:20.820 回答