假设我有简单的生产者/消费者模型,消费者希望将一些状态传回给生产者。例如,让下游流动的对象是我们想要写入文件的对象,上游对象是一些表示对象在文件中写入位置的标记(例如偏移量)。
这两个过程可能看起来像这样(带pipes-4.0
),
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Pipes
import Pipes.Core
import Control.Monad.Trans.State
import Control.Monad
newtype Object = Obj Int
deriving (Show)
newtype ObjectId = ObjId Int
deriving (Show, Num)
writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
where go = do i <- get
obj <- lift $ request i
lift $ lift $ putStrLn $ "Wrote "++show obj
modify (+1)
produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
where go [] = return ()
go (obj:rest) = do
lift $ putStrLn $ "Producing "++show obj
objId <- respond obj
lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
go rest
objects = [ Obj i | i <- [0..10] ]
尽管这可能很简单,但我在推理如何组合它们时遇到了相当大的困难。理想情况下,我们需要一个基于推送的控制流,如下所示,
writeObjects
从阻塞开始request
,发送初始ObjId 0
上游。produceObjects
将第一个对象 , 发送到Obj 0
下游writeObjects
写入对象并增加其状态,然后等待request
,这次发送ObjId 1
上游respond
作为produceObjects
回报ObjId 0
produceObjects
对第二个对象在步骤 (2) 继续,Obj 1
我最初的尝试是使用基于推送的组合,如下所示,
main = void $ run $ produceObjects objects >>~ const writeObjects
请注意使用const
来解决其他不兼容的类型(这可能是问题所在)。然而,在这种情况下,我们发现它ObjId 0
被吃掉了,
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...
基于拉动的方法,
main = void $ run $ const (produceObjects objects) +>> writeObjects
遇到类似的问题,这次 drop Obj 0
。
一个人如何以所需的方式创作这些作品?