2

我有一个值body :: BS.ByteString (ResourceT IO) (),来自一个基于BS.readFile. 我想将该值作为来自WaiApplication的响应正文进行流式传输。有一个助手,streamingResponse它接受 type 的值Stream (Of ByteString) IO r。我可以通过使用将我的转换BS.ByteString (ResourceT IO) ()Stream (Of ByteString) (ResourceT IO) ()BS.toChunks但它包含一个额外的ResourceT单子层。传递bodystreamingResponse我:

Couldn't match type ‘ResourceT IO’ with ‘IO’
  Expected type: Stream (Of ByteString) IO ()
    Actual type: Stream (Of ByteString) (ResourceT IO) ()

我尝试了各种方法,例如在 runResourceT 中包装东西、绑定和提升值等,但真的不知道如何进行。如果需要额外的上下文,这是完整项目中的行

更新0

hoist runResourceT body似乎类型检查。有人还向我推荐了一个 Haskell Pipes thread,这可能是一个非常相关的问题,并且可能暗示了一个解决方案。

4

2 回答 2

2

如果我们想允许Streams 存在于 中ResourceT,我们可以不使用streaming-wai中的函数(仅适用于Stream基于 s的函数),而是构建在network-wai之类的IO函数之上:responseStream

import           Control.Monad.Trans.Resource
import           Network.Wai                     
import           Streaming                   
import qualified Streaming.Prelude               as S
import           Data.ByteString.Builder (byteString, Builder)

streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
                   -> Status
                   -> ResponseHeaders
                   -> Response
streamingResponseR stream status headers =
    responseStream status headers streamingBody
    where
    streamingBody writeBuilder flush =
        let writer a =
                do liftIO (writeBuilder (byteString a))
                    -- flushes for every produced bytestring, perhaps not optimal
                   liftIO flush
         in runResourceT $ void $ S.effects $ S.for stream writer

streamingBody有 type StreamingBody,它实际上是一个函数的类型同义词,(Builder -> IO ()) -> IO () -> IO ()它接受一个写回调和一个刷新回调作为参数,并使用它们使用范围内的某些数据源来编写响应。(请注意,这些回调由 WAI 提供,而不是由用户提供。)

在我们的例子中,数据源是Stream位于ResourceT. 我们需要使用 解除写入和刷新回调(存在于 中IOliftIO,还记得调用runResourceT以在最后返回一个简单IO的操作。


如果我们只想在发出的字节串的累积长度达到某个限制后才刷新响应怎么办?

每次达到限制时,我们都需要一个函数(此处未实现)来创建一个除法:

breaks' :: Monad m 
        => Int 
        -> Stream (Of ByteString) m r 
        -> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined

intercalates然后我们可以在写入流之前使用 , 在每个组之间插入刷新操作:

streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) () 
                  -> Int 
                  -> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
    let writer a = liftIO (writeBuilder (byteString a))
        flusher = liftIO flush
        broken = breaks' breakSize stream
     in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken
于 2018-05-04T18:15:17.057 回答
1

而不是readFile++withFilehSetBinaryMode足够Data.ByteString.Streaming.fromHandle了吗?

fromHandle产生一个ByteString IO ()可以转换为Stream (Of ByteString) IO ()那个streamingResponsestreamingBody可以接受的。

有一个问题是在哪里放置withFile包围操作。根据WAI 文档,您可以用它包装您的Application-building 函数的结果:

请注意,从 WAI 3.0 开始,此类型以连续传递样式构造,以允许适当的安全资源处理。这在过去是通过其他方式(例如,ResourceT)处理的。

[...]

为了以异常安全的方式分配资源,您可以在调用 responseStream 之外使用方括号模式。


注意:streaming-bytestring的文档说fromHandle当到达 EOF 时会自动关闭句柄。从实现来看,情况似乎并非如此。您需要withFile正确关闭手柄。

于 2018-05-03T21:00:30.250 回答