1

我正在尝试StoreStreaming. Store已经decode使用名为decodeMessageBS.

我尝试如下进行store反序列化的基本实现(暂时不保持简单)。但是,因为不断抛出的逻辑似乎有问题:StreamingbracketpopperdecodeMessageBSPeekException

{-# LANGUAGE  RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming

streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
    ref <- lift $ newIORef inp 
    let popper = do
        r <- S.uncons =<< readIORef ref
        case r of
          Nothing -> return Nothing 
          Just (a,rest) -> writeIORef ref rest >> return (Just a)
    let go = do
          r <- lift $ decodeMessageBS bb $ popper
          lift $ print "Decoding"
          case r of 
            Nothing -> return ()
            Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
    go 

我可以很好地解码我的测试文件decodeIOPortionWith- 所以,问题似乎出在 feed 所需的逻辑上decodeMessageBSpopper将欣赏有关此处逻辑有什么问题的指针。

4

1 回答 1

0

发生这种PeekException情况是因为Store在流模式下保存消息时使用了不同的格式,与Binary. 使用函数时,它需要一个Message围绕数据的类型包装器。不需要包装器,因此可以很好地处理保存的数据。在我修复序列化以将数据保存为编码之后,在该数据上工作正常。StoredecodeMessageBSdecodeIOPortionWithMessageStoreMessagedecodeMessageBS

于 2018-04-01T19:37:54.447 回答