5

在为大型(<bloblength><blob>)*编码二进制文件编写反序列化器时,我遇到了各种 Haskell 生产-转换-消费库。到目前为止,我知道四个流媒体库:

这是一个精简的示例,说明当我尝试Word32使用conduit. 一个更现实的例子是首先读取Word32确定 blob 长度的 a,然后产生ByteString该长度的惰性值(然后进一步反序列化)。但在这里我只是尝试从二进制文件中以流方式提取 Word32:

module Main where

-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary

import           Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get              as G
import qualified Data.ByteString              as BS
import qualified Data.ByteString.Char8        as C
import qualified Data.ByteString.Lazy         as BL
import           Data.Conduit
import qualified Data.Conduit.Binary          as CB
import qualified Data.Conduit.List            as CL
import           Data.Word                    (Word32)
import           System.Environment           (getArgs)

-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
    G.runGet G.getWord32be $ BL.fromStrict bs

-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
    mbs <- await
    case mbs of
        Just bs -> do
            case C.null bs of
                False -> do
                    yield $ getWord32 bs
                    leftover $ BS.drop 4 bs
                    transform
                True -> return ()
        Nothing -> return ()

main :: IO ()
main = do
    filename <- fmap (!!0) getArgs  -- should check length getArgs
    result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
    print $ length result   -- is always 8188 for files larger than 32752 bytes

程序的输出只是读取的 Word32 的数量。事实证明,流在读取第一个块(大约 32KiB)后终止。由于某种原因mbs是 never Nothing,所以我必须检查null bs当块被消耗时哪个停止流。很明显,我的导管transform有问题。我看到了解决方案的两条途径:

  1. await不想去 的第二块,ByteStream那么还有另一个函数可以拉下一个块吗?在我见过的示例中(例如 Conduit 101),这不是它的完成方式
  2. 这只是错误的设置方式transform

这是如何正确完成的?这是正确的方法吗?(性能确实很重要。)

更新:这是一种不好的方法Systems.IO.Streams

module Main where

import           Data.Word                (Word32)
import           System.Environment       (getArgs)
import           System.IO                (IOMode (ReadMode), openFile)
import qualified System.IO.Streams        as S
import           System.IO.Streams.Binary (binaryInputStream)
import           System.IO.Streams.List   (outputToList)

main :: IO ()
main = do
    filename : _ <- getArgs
    h <- openFile filename ReadMode
    s <- S.handleToInputStream h
    i <- binaryInputStream s :: IO (S.InputStream Word32)
    r <- outputToList $ S.connect i
    print $ last r

'Bad' 表示:对时间和空间要求很高,不处理 Decode 异常。

4

3 回答 3

3

您的直接问题是由您使用leftover. 该函数用于“提供单个剩余输入以供当前单子绑定中的下一个组件使用”,因此当您bs在循环之前提供它时,transform您实际上是丢弃了其余的字节串(即之后bs)。

基于您的代码的正确解决方案将使用增量输入接口Data.Binary.Get替换您的yield/组合为leftover完全消耗每个块的东西。不过,更实用的方法是使用binary-conduit包,它以以下形式提供conduitGet(它的源代码很好地了解了“手动”实现的样子):

import           Data.Conduit.Serialization.Binary

-- etc.

transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be

一个警告是,如果字节总数不是 4 的倍数(即最后一个Word32不完整),这将引发解析错误。在不太可能的情况下,这不是您想要的,一个懒惰的出路就是简单地\bs -> C.take (4 * truncate (C.length bs / 4)) bs在输入字节串上使用。

于 2016-10-08T18:38:15.067 回答
3

使用pipes(and pipes-groupand pipes-bytestring) 演示问题简化为组合子。首先,我们将传入的未区分字节流解析为 4 字节的小块:

chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n) 

然后我们将它们映射到Word32s 并(在这里)计算它们。

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
     print n

如果我们有少于 4 个字节或无法解析,这将失败,但我们也可以映射

getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case  G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
  Left r -> Nothing
  Right (_, off, w32) -> Just w32

然后,以下程序将打印有效 4 字节序列的解析

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ chunksOfStrict 4 (Bytes.fromHandle h) 
                 >-> P.map getMaybeWord32
                 >-> P.concat  -- here `concat` eliminates maybes
                 >-> P.print 

当然,还有其他处理失败解析的方法。

但是,这里更接近您要求的程序。它从字节流 ( ) 中获取一个四字节段,并在它足够长时Producer ByteString m r读取它;Word32然后它会获取许多传入的字节并将它们累积到一个惰性字节串中,从而产生它。它只是重复这个直到它用完字节。在main下面,我打印了每个产生的惰性字节串:

module Main (main) where 
import Pipes 
import qualified Pipes.Prelude as P
import Pipes.Group (folds) 
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL 
import System.Environment ( getArgs )

splitLazy :: (Monad m, Integral n) =>
   n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
  (bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
  return (BL.fromChunks bss, rest)

measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
 (lbs, rest) <- lift $ splitLazy 4 bs
 if BL.length lbs /= 4
   then rest >-> P.drain -- in fact it will be empty
   else do
     let w32 = G.runGet G.getWord32be lbs
     (lbs', rest') <- lift $ splitLazy w32 bs
     yield lbs
     measureChunks rest

main :: IO ()
main = do
  filename:_ <- getArgs
  IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print

这又是粗略的,因为它使用runGetnot runGetOrFail,但这很容易修复。管道标准过程是在解析失败时停止流转换并返回未解析的字节流。

如果您预计它们Word32s用于大数字,因此您不想将相应的字节流累积为惰性字节串,而是说将它们写入不同的文件而不累积,我们可以很容易地更改程序来做到这一点。这将需要复杂地使用管道,但这是使用pipes和的首选方法streaming

于 2016-10-08T20:33:15.937 回答
1

这是我想提出的一个相对简单的解决方案。splitAt这是对Wrapped into Statemonad的重复使用,它提供了与 (a subset of) 相同的接口Data.Binary.Get。结果[ByteString]mainwhileJustover获得的getBlob

module Main (main) where

import           Control.Monad.Loops
import           Control.Monad.State
import qualified Data.Binary.Get      as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import           Data.Int             (Int64)
import           Data.Word            (Word32)
import           System.Environment   (getArgs)

-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString

getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
    let (w, rest) = BL.splitAt 4 bs
    case BL.length w of
        4 -> (Just w', rest) where
            w' = G.runGet G.getWord32be w
        _ -> (Nothing, BL.empty)

getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs

getBlob :: Get (Maybe BL.ByteString)
getBlob = do
    ml <- getWord32be
    case ml of
        Nothing -> return Nothing
        Just l -> do
            blob <- getLazyByteString (fromIntegral l :: Int64)
            return $ Just blob

runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs

main :: IO ()
main = do
    fname <- head <$> getArgs
    bs <- BL.readFile fname
    let ls = runGet loop bs where
        loop = whileJust getBlob return
    print $ length ls

中没有错误处理getBlob,但很容易扩展。时间和空间复杂度相当不错,只要谨慎使用结果列表即可。(创建一些随机数据供上述消费的python脚本在这里)。

于 2016-10-10T13:43:04.000 回答