1

我有一个代码,它使用文件句柄来模拟Bytestring来自源(AWS S3)的流式传输的接收器。如果我们想用作接收器,将下面的代码与(带有连接句柄)Network.Websocket交换就足够了吗?LBS.writeFilesendBinaryData

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text ->  IO Int
getObject cfg bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    {- Create a request object with S3.getObject and run the request with pureAws. -}
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    {- Stream the response to a lazy bytestring -}
    liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes 
    let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
    S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
    return $ lookup "content-length" (S3.omUserMetadata mdata))
  case req of
    Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
    Right _ -> return 0

对我来说困惑的根源是如何确定流的终止?对于文件,这由writeFileAPI 处理。怎么样sendBinaryData?它是否以类似的方式处理终止writeFile?还是由客户端的数据解析器确定?

更新

这个问题是关于如何将数据流式传输到 websocket 句柄(假设已经提供了一个句柄),就像我们在上面的示例中处理文件句柄一样,而不是关于如何在resourceT. conduit似乎确实采取mapM_了接收数据的方法。所以,这似乎确实是要走的路。

终止问题是因为我有这样的想法:如果我们有一个函数在 Websocket 句柄的另一侧监听数据,那么确定消息的结束在流上下文中似乎很重要。给定如下函数:

f :: LBS.ByteString -> a

如果我们确实S.mapM_将数据流式传输到 websocket 句柄,它是否会添加某种end of stream标记,以便f在另一端监听可以停止处理惰性字节串。否则f将不知道消息何时完成。

4

3 回答 3

2

您认为手柄需要额外的技巧是正确的。但是,由于您已经在使用ResourceTmonad 转换器,因此使用allocate. allocate允许您在资源单子中创建一个句柄并注册一个清理操作(在您的情况下只是关闭连接)。

ok <- runResourceT $ do
  (releaseKey, handle) <-
    allocate (WebSockets.acceptRequest request) 
             (`WebSockets.sendClose` closeMessage)
  WebSockets.sendBinaryData handle data
  return ok
where
  request = ...
  closeMessage = ...
  data = ...
  ok = ...

通过使用allocate,句柄保证在runResourceT返回时关闭ok

但是,我不完全确定这是您想要的。在我看来,getObject不应该知道如何接受和关闭 WS 连接;也许它应该将 WS 连接句柄作为参数,然后写入它。如果您将其返回类型升级到,ResourceT那么您可以向调用者收取调用和分配 WS 句柄等getObject的责任 。runResourceT但希望上面的示例足以让您继续前进。

于 2016-06-14T14:54:33.640 回答
1

这里有一些零碎的东西,可能会使事情更容易理解。首先,对于第一个小演示,修改你的getObject,我使用Streaming.ByteString.writeFileResourceT无论如何,通过惰性字节串来绕道。

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit 
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as HTTP
import qualified Data.ByteString.Streaming as SB
import qualified Data.ByteString.Streaming.Internal as SB
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import           Streaming as S
import           Streaming.Prelude as S hiding (show,print)
import           Control.Concurrent.Async (async,waitCatch)
import           Data.Text as T (Text) 
import qualified Network.WebSockets as WebSockets
import           Control.Monad.Trans.Resource

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
                             , _aws_s3cfg :: S3.S3Configuration a
                             , _aws_httpmgr :: HTTP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text ->  IO Int
getObject cfg file bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    let bytestream = do 
         -- lookup "content-length" (S3.omUserMetadata mdata))
         SB.chunk B.empty -- this will be replaced by content-length 
         hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk 
    SB.writeFile file bytestream ) -- this is in ResourceT 
  case req of
    Left _ -> return 2
    Right _ -> return 0

我们可以或多或少地从中抽象出你在做什么SB.writeFile

getObjectAbstracted
      :: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
         -> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
getObjectAbstracted action cfg bucket key = do
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) 
                  (_aws_s3cfg cfg) 
                  (_aws_httpmgr cfg) 
                  (S3.getObject bucket key)

    action (hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk) 

现在,我们需要一个不包含在流式字节串库中的小帮手

mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
mapMChunks_ act bytestream = do
  (a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
  return r

并且可以或多或少地按照@haoformayor 的计划进行,使用流式字节串

writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
writeConnection connection  = 
  mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)

-- following `haoformayor`
connectWrite
    :: (MonadResource m, WebSockets.WebSocketsData a) 
    => WebSockets.PendingConnection 
    -> a                  -- closing  message
    -> SB.ByteString m r  -- stream from aws
    -> m r
connectWrite request closeMessage bytestream = do
    (releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
                                         (`WebSockets.sendClose` closeMessage)
    writeConnection connection bytestream

getObjectWS :: WebSockets.WebSocketsData a =>
       WebSockets.PendingConnection
       -> a
       -> AwsConfig Aws.NormalQuery
       -> S3.Bucket
       -> Text
       -> ResourceT IO ()
getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)

当然,到目前为止,这些都没有利用conduitstreaming/之间的区别streaming-bytestring

于 2016-06-14T16:18:58.883 回答
1

(警告 - 代码未经测试。)

每次数据包进入时,您的代码都会重新打开输出文件并附加到它。显然,更好的解决方案是使用LBS.hPutStr已打开的文件句柄写入文件。

也就是说,而不是:

S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj

你想使用:

S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj

当然,这引用了句柄h,它是从哪里来的?

getObject一种解决方案是在调用 的主体之前将其传递给或以其他方式创建它getObject,例如:

getObject cfg bucket key = withFile "output" $ \h -> do
    req <- ...
    ...
    S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
    ...

或者,也许您必须在 runResourceT 内创建...我不确定。

更新- 请参阅@haoformayor 的答案,了解如何让 ResourceT 为您管理文件句柄。

于 2016-06-14T14:55:48.897 回答