1

我正在使用 streaming-utils streaming-utils 流式传输HTTP 响应正文。我想跟踪类似于bytestring-progress允许使用 lazy ByteStrings 的进度。我怀疑toChunks有必要这样做,然后减少读取的一些累积字节并返回未修改的原始流。但我无法弄清楚,流媒体文档非常无用,主要是对替代库的宏大比较。

到目前为止,这是我尽最大努力的一些代码。它还不包括计数,只是尝试在块流过时打印块的大小(并且不编译)。

download :: ByteString -> FilePath -> IO ()
download i file = do
  req <- parseRequest . C.unpack $ i
  m <- newHttpClientManager
  runResourceT $ do
    resp <- http req m
    lift . traceIO $ "downloading " <> file
    let body = SBS.fromChunks $ mapsM step $ SBS.toChunks $ responseBody resp
    SBS.writeFile file body

step bs = do
  traceIO $ "got " <> show (C.length bs) <> " bytes"
  return bs
4

1 回答 1

3

我们想要的是通过Stream (Of ByteString) IO ()两种方式遍历:

  • 一种累积ByteStrings 的传入长度并将更新打印到控制台的方法。
  • 将流写入文件的一种。

我们可以借助copy具有以下类型的函数来做到这一点:

copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r

copy获取一个流并将其复制到两个不同的单子层中,其中原始流的每个元素都由新分离流的两个层发出。

(请注意,我们正在更改基本 monad,而不是函子。将函子更改为另一个函数的Stream作用是在单个流中分隔组,我们在这里对此不感兴趣。)

下面的函数接受一个流,复制它,用 累积传入字符串的长度S.scan打印它们,然后返回另一个您仍然可以使用的流,例如将其写入文件:

{-# LANGUAGE OverloadedStrings #-}
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString as B

track :: Stream (Of B.ByteString) IO r -> Stream (Of B.ByteString) IO r
track stream =
      S.mapM_ (liftIO . print) -- brings us back to the base monad, here another stream
    . S.scan (\s b -> s + B.length b) (0::Int) id
    $ S.copy stream

这将打印ByteStrings 以及累积的长度:

main :: IO ()
main = S.mapM_ B.putStr . track $ S.each ["aa","bb","c"]
于 2018-03-06T06:45:56.267 回答