我有一个代码,它使用文件句柄来模拟Bytestring
来自源(AWS S3
)的流式传输的接收器。如果我们想用作接收器,将下面的代码与(带有连接句柄)Network.Websocket
交换就足够了吗?LBS.writeFile
sendBinaryData
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text -> IO Int
getObject cfg bucket key = do
req <- waitCatch =<< async (runResourceT $ do
{- Create a request object with S3.getObject and run the request with pureAws. -}
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
{- Stream the response to a lazy bytestring -}
liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes
let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
return $ lookup "content-length" (S3.omUserMetadata mdata))
case req of
Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
Right _ -> return 0
对我来说困惑的根源是如何确定流的终止?对于文件,这由writeFile
API 处理。怎么样sendBinaryData
?它是否以类似的方式处理终止writeFile
?还是由客户端的数据解析器确定?
更新
这个问题是关于如何将数据流式传输到 websocket 句柄(假设已经提供了一个句柄),就像我们在上面的示例中处理文件句柄一样,而不是关于如何在resourceT
. conduit
似乎确实采取mapM_
了接收数据的方法。所以,这似乎确实是要走的路。
终止问题是因为我有这样的想法:如果我们有一个函数在 Websocket 句柄的另一侧监听数据,那么确定消息的结束在流上下文中似乎很重要。给定如下函数:
f :: LBS.ByteString -> a
如果我们确实S.mapM_
将数据流式传输到 websocket 句柄,它是否会添加某种end of stream
标记,以便f
在另一端监听可以停止处理惰性字节串。否则f
将不知道消息何时完成。