1

假设我们有一个这样的 JSON 对象(带有 base64 编码的字节串):

TaggedImage = TaggedImage {  id :: Text, image :: ByteString }

现在,我们想从一个源接收,并使用标签image中的信息将其存储在一个位置。id因此,这意味着id必须提前解析(以确定图像的位置),同时image以流方式解析。这是直截了当的吗?

我打算使用pipes-aeson, aws(用于S3存储)并使用作为消费者的存储桶pipesWebsocket生产者进行流式解码(在我们解析确定存储桶的位置S3之前无法创建它)。看着方法,我不知道我是否真的可以按照我上面的要求去做。这是我第一次尝试在 JSON 和管道中进行流式传输。因此,我们将非常感谢您的帮助。idS3decoded

一个对文件系统进行读写的简单示例也可以作为 and 的替代Websocket producerS3 consumer

附录

由于 JSON 键值对根据RFC是无序的,而数组是有序的,因此对于我上面定义的数据类型,image数据可能会出现在 之前。id因此,将其更改为 JSON 数组(Haskell 中的一个元组,aesonTH 派生似乎转换为有序数组)也可能会有所帮助。如果需要,请随时更改数据类型定义,以便对解码进行排序。例如,数据类型可能更改为:

TaggedImage = TaggedImage (Text,ByteString)
4

1 回答 1

1

我相信您将无法重用该pipes-aeson库,因为它不提供在解码的 JSON 记录的嵌套字段上流式传输的方法,也不支持结构的类似光标的导航。这意味着您将需要手动解析 JSON 记录的骨架。

此外,还需要做一些工作来将这种类型的 API 包装base64-bytestring在类似pipes的 API 中:

-- Convert a base64-encoded stream to a raw byte stream
decodeBase64
    :: Producer ByteString m r
    -- ^ Base64-encoded bytes
    -> Producer ByteString m (Either SomeException (Producer ByteString m r)) 
    -- ^ Raw bytes

请注意,Producer如果解码成功完成,则结果将返回字节字符串的其余部分(即 base64 编码字节之后的所有内容)。这使您可以继续解析图像字节结束的位置。

但是,假设您有一个decodeBase64函数,那么代码如何工作的大致轮廓是您将具有三个部分:

  • binary使用适用于的解析器在图像字节之前解析记录的前缀pipes
  • 使用该decodeBase64函数流式传输解码的图像字节
  • 在图像字节之后解析记录的后缀,也使用binary适用于的解析器pipes

换句话说,类型和实现大致如下所示:

-- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
skipPrefix :: Data.Binary.Get ()

skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)

— This would match the "' }" suffix of the JSON record
skipSuffix :: Data.Binary.Get ()

skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)

streamImage
    ::  Monad m
    =>  Producer ByteString m r
    ->  Producer ByteString m (Either SomeException (Producer ByteString m r))
streamImage p0 = do
    e0 <- lift (skipPrefix’ p0)
    case e0 of
        Left exc -> return (Left (toException exc))
        Right p1 -> do
            e1 <- decodeBase64 p1
            case e1 of
                Left exc -> return (Left exc)
                Right p2 -> do
                    e2 <- lift (skipSuffix’ p2)
                    case e2 of
                        Left exc -> return (Left (toException exc))
                        Right p3 -> return (Right p3)

换句话说,streamImage它将 aProducer作为输入,从 JSON 记录的第一个字符开始,它将流式传输从该记录中提取的解码图像字节。如果解码成功,那么它将在 JSON 记录之后立即返回字节流的剩余部分。

于 2016-05-29T04:37:30.137 回答