signalExhausted
无法定义类似管道,但等效于(>-> signalExhausted)
can 的函数。
>->
是该pull
类别的专门版本。执行由下游代理从上游代理中提取数据驱动。下游代理向上游发送一个空请求()
并阻塞,直到从上游代理返回一个包含值的响应。当上游代理用尽并且没有更多值要发回时,它return
是。您可以return
在 的定义中看到对这些示例很重要的each
。
each = F.foldr (\a p -> yield a >> p) (return ())
-- what to do when the data's exhausted ^
下游代理需要一个值才能继续运行,但管道库无法提供任何值,因此下游代理永远不会再次运行。由于它永远不会再次运行,因此它无法修改或对数据做出反应。
这个问题有两种解决方案。最简单的方法是越过上游管道并在完成后添加一个。map
Value
yield Exhausted
import Pipes
import qualified Pipes.Prelude as P
data Value a = Value a | Exhausted
deriving (Show)
signalExhausted p = p >-> P.map Value >> yield Exhausted
除了函数signalExhausted
代替(>-> signalExhausted)
.
let xs = words "hubble bubble toil and trouble"
print . P.toList . signalExhausted $ each xs
[Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted]
这个问题的更通用的解决方案是阻止上游代理返回,而是在它耗尽时向下游发出信号。我在一个相关问题的回答中演示了如何做到这一点。
import Control.Monad
import Pipes.Core
returnDownstream :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' (Either r b) m r'
returnDownstream = (forever . respond . Left =<<) . (respond . Right <\\)
这将替换 eachrespond
并respond . Right
替换return
为forever . respond . left
,将返回与响应一起发送到下游。
returnDownstream
比你要找的更一般。我们可以演示如何使用它来重新创建signalExhausted
. returnDownstream
将返回的管道转换为永不返回的管道,而是将其返回值作为Left
an 的值向下游转发Either
。
signalExhausted p = returnDownstream p >-> respondLeftOnce
respondLeftOnce
是一个示例下游代理。下游代理可以区分保存在中的常规值和保存在中Right
的返回值Left
。
respondLeftOnce :: Monad m => Pipe (Either e a) (Value a) m ()
respondLeftOnce = go
where
go = do
ea <- await
case ea of
Right a -> yield (Value a) >> go
Left _ -> yield Exhausted -- The upstream proxy is exhausted; do something else