14

在不断努力有效地摆弄比特(例如,参见这个SO question)中,最新的挑战是比特的有效流式传输和消耗。

作为第一个简单任务,我选择在由/dev/urandom. 一个典型的咒语是head -c 1000000 </dev/urandom | my-exe。实际目标是流式传输比特并解码Elias 伽马码,例如,即不是字节块或其倍数的代码。

对于这种可变长度的代码,最好使用take, takeWhile,group等语言进行列表操作。由于 aBitStream.take实际上会消耗部分双流,因此某些 monad 可能会发挥作用。

明显的起点是来自Data.ByteString.Lazy.

A. 计数字节

正如预期的那样,这个非常简单的 Haskell 程序的性能与 C 程序相当。

import qualified Data.ByteString.Lazy as BSL

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ BSL.length bs

B. 添加字节

一旦我开始使用,unpack事情就会变得更糟。

main = do
    bs <- BSL.getContents
    print $ sum $ BSL.unpack bs

令人惊讶的是,Haskell 和 C 表现出几乎相同的性能。

C. 最长相同位序列

作为第一个重要任务,可以像这样找到最长的相同位序列:

module Main where

import           Data.Bits            (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import           Data.List            (group)
import           Data.Word8           (Word8)

splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]

bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ maximum $ length <$> (group $ bitStream bs)

惰性字节串被转换为一个列表[Word8],然后使用移位将每个Word字节拆分为位,从而产生一个列表[Bool]。这个列表列表然后用concat. 获得 的(惰性)列表后Bool,用于group将列表拆分为相同位的序列,然后length对其进行映射。最后maximum给出了想要的结果。很简单,但不是很快:

# C
real    0m0.606s

# Haskell
real    0m6.062s

这种幼稚的实现确实慢了一个数量级。

分析显示分配了相当多的内存(大约 3GB 用于解析 1MB 的输入)。不过,没有观察到大规模的空间泄漏。

从这里我开始四处寻找:

  • 有一个bitstream承诺“具有半自动流融合的快速、打包、严格的比特流(即布尔列表)。 ”。vector不幸的是,它与当前的软件包不是最新的,请参阅此处了解详细信息。
  • 接下来,我调查streaming. 我不太明白为什么我应该需要“有效”的流来让一些 monad 发挥作用——至少在我开始与所提出的任务相反之前,即编码和将比特流写入文件。
  • 怎么foldByteString?我必须引入状态来跟踪消耗的位。这不是很好的take, takeWhile, group, 等语言。

现在我不太确定该去哪里。

更新

我想出了如何使用streamingand来做到这一点streaming-bytestring。我可能做的不对,因为结果非常糟糕。

import           Data.Bits                 (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import           Data.Word8                (Word8)
import qualified Streaming                 as S
import           Streaming.Prelude         (Of, Stream)
import qualified Streaming.Prelude         as S

splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]

bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s

main :: IO ()
main = do
    let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
        gs = S.group $ bitStream bs ::  Stream (Stream (Of Bool) IO) IO ()
    maxLen <- S.maximum $ S.mapped S.length gs
    print $ S.fst' maxLen

这将测试您对标准输入几千字节输入之外的任何内容的耐心。分析器说它在 和 中花费了大量时间(输入大小的二次方Streaming.Internal.>>=.loopData.Functor.Of.fmap。我不太确定第一个是什么,但fmap表明(?)这些杂耍对Of a b我们没有任何好处,而且因为我们在 IO monad 中,它不能被优化掉。

我这里也有相当于字节加法器的流SumBytesStream.hs,它比简单的惰性实现稍慢ByteString,但仍然不错。由于streaming-bytestring宣布为“ bytestring io done right ” 我期望更好。那我可能做得不对。

在任何情况下,所有这些位计算都不应该发生在 IO monad 中。但是BSS.getContents强迫我进入 IO 单子,因为getContents :: MonadIO m => ByteString m ()没有出路。

更新 2

按照@dfeuer 的建议,我streaming在master@HEAD 使用了这个包。这是结果。

longest-seq-c       0m0.747s    (C)
longest-seq         0m8.190s    (Haskell ByteString)
longest-seq-stream  0m13.946s   (Haskell streaming-bytestring)

的 O(n^2) 问题Streaming.concat已解决,但我们仍然没有接近 C 基准。

更新 3

Cirdec 的解决方案产生了与 C 相当的性能。使用的构造称为“Church 编码列表”,请参阅此SO 答案或 Haskell Wiki on rank-N types

源文件:

所有源文件都可以在github上找到。具有运行实验和分析的Makefile所有各种目标。默认make将只构建所有内容(首先创建一个bin/目录!),然后make timelongest-seq可执行文件进行计时。C 可执行文件会-c附加一个以区分它们。

4

2 回答 2

1

当流上的操作融合在一起时,可以消除中间分配及其相应的开销。GHC prelude 以重写规则的形式为惰性流提供折叠/构建融合。一般的想法是,如果一个函数产生一个看起来像 foldr 的结果(它具有(a -> b -> b) -> b -> b应用于(:)and的类型[]),而另一个函数使用一个看起来像 foldr 的列表,则可以删除构造中间列表。

对于您的问题,我将构建类似的东西,但使用严格的左折叠(foldl')而不是 foldr。foldl我将使用强制列表看起来像左折叠的数据类型,而不是使用尝试检测何时看起来像的重写规则。

-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}

由于我从放弃列表开始,我们将重新实现列表前奏的一部分。

foldl'可以从列表和字节串的函数中创建严格的左折叠。

{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)

{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)

最简单的使用示例是查找列表的长度。

{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0

我们还可以映射和连接左折叠。

{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)

{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)

对于您的问题,我们需要能够将一个单词分成几位。

{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]

{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte

和一个ByteString成位

{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS

为了找到最长的运行,我们将跟踪前一个值、当前运行的长度和最长运行的长度。我们使字段严格,以便折叠的严格性可以防止 thunk 链在内存中累积。为状态创建严格的数据类型是一种控制其内存表示和何时评估其字段的简单方法。

data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
  where
    current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
 where
   (LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)

我们完成了

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun $ bitStream' bs

这要快得多,但不完全是 c 的性能。

longest-seq-c       0m00.12s    (C)
longest-seq         0m08.65s    (Haskell ByteString)
longest-seq-fuse    0m00.81s    (Haskell ByteString fused)

该程序分配大约 1 Mb 来从输入中读取 1000000 个字节。

total alloc =   1,173,104 bytes  (excludes profiling overheads)

更新了 github 代码

于 2018-05-01T11:26:16.447 回答
0

我找到了另一个与 C 相当的解决方案。它Data.Vector.Fusion.Stream.Monadic有一个基于Coutts, Leshchinskiy, Stewart 2007 论文的流实现。它背后的想法是使用破坏/展开流融合。

回想一下,列表的展开器 :: (b -> Maybe (a, b)) -> b -> [a]通过重复应用(展开)一个步进函数来创建一个列表,从一个初始值开始。AStream只是一个unfoldr具有起始状态的函数。(该Data.Vector.Fusion.Stream.Monadic库使用 GADT 来创建Step可以方便地进行模式匹配的构造函数。我认为,它也可以在没有 GADT 的情况下完成。)

解决方案的核心部分是mkBitstream :: BSL.ByteString -> Stream Bool将 aBytesString转换为 的流的函数Bool。基本上,我们跟踪 current ByteString,当前字节,以及当前字节还有多少未被消耗。每当一个字节用完时,另一个字节就被砍掉ByteString。当Nothing离开时,流是Done

longestRun函数直接取自@Cirdec 的解决方案。

这是练习曲:

{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where

import           Control.Monad.Identity            (Identity)
import           Data.Bits                         (shiftR, (.&.))
import qualified Data.ByteString.Lazy              as BSL
import           Data.Functor.Identity             (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import           Data.Word8                        (Word8)

type Stream a = S.Stream Identity a   -- no need for any monad, really

data Step = Step BSL.ByteString !Word8 !Word8   -- could use tuples, but this is faster

mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
    {-# INLINE_INNER step #-}
    step (Step bs w n) | n==0 = case (BSL.uncons bs) of
                            Nothing        -> return S.Done
                            Just (w', bs') -> return $ 
                                S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
                       | otherwise = return $ 
                                S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))


data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x  = LongestRun x current (max current longest)
    where current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
    (LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
    return longest

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun (mkBitstream bs)
于 2018-06-26T11:13:51.500 回答