6

我正在尝试使用论文“Faster coroutine pipelines”中描述的抽象来构建一个流式库。我已经修改了代码,以便它正确处理管道退出(而不是在发生这种情况时抛出错误):

-- | r: return type of the continuation, i: input stream type, o: output stream type,
--   m: underlying monad, a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
  deriving
    ( Functor,
      Applicative,
      Monad
    )
    via (Cont (Result r m i o))

type Result r m i o = InCont r m i -> OutCont r m o -> m r

newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}

newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}

suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok

suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok

emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok Nothing emptyIk

await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)

yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)

(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
  runPipe
    q
    (\a _ ok' -> k a emptyIk ok')
    (suspendIn (runPipe p (\() -> f)) ik)
    ok
  where
    f :: Result r m i e
    f _ ok = resumeOut ok Nothing emptyIk

runContPipe :: forall m a. Applicative m => ContPipe a () Void m a -> m a
runContPipe p = runPipe p (\a _ _ -> pure a) ik ok
  where
    ik :: InCont a m ()
    ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
    ok :: OutCont a m Void
    ok = MakeOutCont \_ ik' -> resumeIn ik' ok

我想实现一个功能

fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)

这将两个消费者流合并为一个(类似于管道的ZipSink)。它应该具有以下语义:

  1. 如果两个流都没有退出并且正在接受输入,则将相同的输入值提供给两个流
  2. 如果一个流已退出,则存储返回值,然后将输入馈送到接受该值的流中
  3. 如果两个流都已退出,则将两个流的返回值放入一个元组中。

这是我的尝试:

我们重用了loop论文中连接一个InCont r m i到两个OutCont r m i并主动恢复延续的函数。

loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
  resumeIn ik $ MakeOutCont \v ik' ->
    resumeOut ok1 v $ MakeInCont \ok1' ->
      resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'

由于loop我们可以将结果管道的输入同时连接到两个管道,输出将在两个管道之间共享(这并不重要,因为你不能产生 a Void)。

fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
  MakePipe \k ik ok ->
    let f :: a -> Result r m i Void
        f a ik' ok' = _
        g :: b -> Result r m i Void
        g b ik' ok' = _
     in runPipe
          p
          f
          (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
          ok

现在我们只需要填写延续f,它们将在它们退出时被g调用。如果在被调用时已经被调用,这意味着已经退出,那么应该调用 continuation ,如果还没有被调用,那么应该存储返回值并恢复输入继续(通过丢弃所有传递的值)似乎对我来说,如果没有某种形式的共享状态,就不可能实现这一点。我们可以尝试使用 state monad 来存储状态:pqgfqfkgfam

fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
  MakePipe \k ik ok ->
    let f :: a -> Result r m i Void
        f a ik' ok' = do
          s <- get
          case s of
            Nothing -> do
              put (Just (Left a))
              resumeIn ik' sinkOk
            Just (Right b) -> do
              k (a, b) ik' ok'
            _ -> error "unexpected state"
        g :: b -> Result r m i Void
        g b ik' ok' = do
          s <- get
          case s of
            Nothing -> do
              put (Just (Right b))
              resumeIn ik' sinkOk
            Just (Left a) -> do
              k (a, b) ik' ok'
            _ -> error "unexpected state"
     in runPipe
          p
          f
          (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
          ok

sinkOk是丢弃所有输入的输出延续:

sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk

我们现在可以添加一些辅助功能进行测试:

print' :: MonadIO m => Show i => ContPipe r i o m ()
print' = do
  m <- await
  case m of
    Nothing -> pure ()
    Just i -> do
      lift $ liftIO (print i)
      print'

upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
  yield i
  upfrom (i + 1)

take' :: Int -> ContPipe r i i m ()
take' n
  | n <= 0 = pure ()
  | otherwise = do
    m <- await
    case m of
      Nothing -> pure ()
      Just i -> do
        yield i
        take' (n - 1)

这在p退出早于的情况下有效q

flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' print'

给出所需的输出:

1
1
2
2
3
3
((),())

q但是当早于 退出时,它会进入无限循环p

flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')

输出:

1
1
2
2
<loops>
4

0 回答 0