我正在尝试使用论文“Faster coroutine pipelines”中描述的抽象来构建一个流式库。我已经修改了代码,以便它正确处理管道退出(而不是在发生这种情况时抛出错误):
-- | r: return type of the continuation, i: input stream type, o: output stream type,
-- m: underlying monad, a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
deriving
( Functor,
Applicative,
Monad
)
via (Cont (Result r m i o))
type Result r m i o = InCont r m i -> OutCont r m o -> m r
newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}
newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}
suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok
suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok
emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok Nothing emptyIk
await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)
yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)
(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
runPipe
q
(\a _ ok' -> k a emptyIk ok')
(suspendIn (runPipe p (\() -> f)) ik)
ok
where
f :: Result r m i e
f _ ok = resumeOut ok Nothing emptyIk
runContPipe :: forall m a. Applicative m => ContPipe a () Void m a -> m a
runContPipe p = runPipe p (\a _ _ -> pure a) ik ok
where
ik :: InCont a m ()
ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
ok :: OutCont a m Void
ok = MakeOutCont \_ ik' -> resumeIn ik' ok
我想实现一个功能
fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
这将两个消费者流合并为一个(类似于管道的ZipSink
)。它应该具有以下语义:
- 如果两个流都没有退出并且正在接受输入,则将相同的输入值提供给两个流
- 如果一个流已退出,则存储返回值,然后将输入馈送到接受该值的流中
- 如果两个流都已退出,则将两个流的返回值放入一个元组中。
这是我的尝试:
我们重用了loop
论文中连接一个InCont r m i
到两个OutCont r m i
并主动恢复延续的函数。
loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
resumeIn ik $ MakeOutCont \v ik' ->
resumeOut ok1 v $ MakeInCont \ok1' ->
resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'
由于loop
我们可以将结果管道的输入同时连接到两个管道,输出将在两个管道之间共享(这并不重要,因为你不能产生 a Void
)。
fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = _
g :: b -> Result r m i Void
g b ik' ok' = _
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
现在我们只需要填写延续f
,它们将在它们退出时被g
调用。如果在被调用时已经被调用,这意味着已经退出,那么应该调用 continuation ,如果还没有被调用,那么应该存储返回值并恢复输入继续(通过丢弃所有传递的值)似乎对我来说,如果没有某种形式的共享状态,就不可能实现这一点。我们可以尝试使用 state monad 来存储状态:p
q
g
f
q
f
k
g
f
a
m
fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Left a))
resumeIn ik' sinkOk
Just (Right b) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
g :: b -> Result r m i Void
g b ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Right b))
resumeIn ik' sinkOk
Just (Left a) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
sinkOk
是丢弃所有输入的输出延续:
sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk
我们现在可以添加一些辅助功能进行测试:
print' :: MonadIO m => Show i => ContPipe r i o m ()
print' = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
lift $ liftIO (print i)
print'
upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
yield i
upfrom (i + 1)
take' :: Int -> ContPipe r i i m ()
take' n
| n <= 0 = pure ()
| otherwise = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
yield i
take' (n - 1)
这在p
退出早于的情况下有效q
:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' print'
给出所需的输出:
1
1
2
2
3
3
((),())
q
但是当早于 退出时,它会进入无限循环p
:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')
输出:
1
1
2
2
<loops>