如果我们想允许Stream
s 存在于 中ResourceT
,我们可以不使用streaming-wai中的函数(仅适用于Stream
基于 s的函数),而是构建在network-wai之类的IO
函数之上:responseStream
import Control.Monad.Trans.Resource
import Network.Wai
import Streaming
import qualified Streaming.Prelude as S
import Data.ByteString.Builder (byteString, Builder)
streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
-> Status
-> ResponseHeaders
-> Response
streamingResponseR stream status headers =
responseStream status headers streamingBody
where
streamingBody writeBuilder flush =
let writer a =
do liftIO (writeBuilder (byteString a))
-- flushes for every produced bytestring, perhaps not optimal
liftIO flush
in runResourceT $ void $ S.effects $ S.for stream writer
streamingBody
有 type StreamingBody
,它实际上是一个函数的类型同义词,(Builder -> IO ()) -> IO () -> IO ()
它接受一个写回调和一个刷新回调作为参数,并使用它们使用范围内的某些数据源来编写响应。(请注意,这些回调由 WAI 提供,而不是由用户提供。)
在我们的例子中,数据源是Stream
位于ResourceT
. 我们需要使用 解除写入和刷新回调(存在于 中IO
)liftIO
,还记得调用runResourceT
以在最后返回一个简单IO
的操作。
如果我们只想在发出的字节串的累积长度达到某个限制后才刷新响应怎么办?
每次达到限制时,我们都需要一个函数(此处未实现)来创建一个除法:
breaks' :: Monad m
=> Int
-> Stream (Of ByteString) m r
-> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined
intercalates
然后我们可以在写入流之前使用 , 在每个组之间插入刷新操作:
streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) ()
-> Int
-> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
let writer a = liftIO (writeBuilder (byteString a))
flusher = liftIO flush
broken = breaks' breakSize stream
in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken