给定一个(严格的)ByteString,计算它包含的每个可能字节的数量最有效的方法是什么?
我认为这sort
应该实现为计数排序 - 但似乎没有一种方法可以访问相关计数。我还看到有一个count
函数,它计算给定字节出现的次数。这给了我以下选项:
map (\ b -> count b str) [0x00 .. 0xFF]
map length . group . sort
- 有
fold*
一个IntMap
字节频率的东西。
哪个可能会给我最好的表现?
给定一个(严格的)ByteString,计算它包含的每个可能字节的数量最有效的方法是什么?
我认为这sort
应该实现为计数排序 - 但似乎没有一种方法可以访问相关计数。我还看到有一个count
函数,它计算给定字节出现的次数。这给了我以下选项:
map (\ b -> count b str) [0x00 .. 0xFF]
map length . group . sort
fold*
一个IntMap
字节频率的东西。哪个可能会给我最好的表现?
dflemstr的基本思想当然是正确的,但是由于您想要获得最佳性能,因此您需要使用未经检查的访问ByteString
以及对计数数组的访问,例如
import Control.Monad.ST
import Data.Array.ST
import Data.Array.Base (unsafeRead, unsafeWrite)
import Data.Array.Unboxed
import Data.Word
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.ByteString.Unsafe
histogram :: ByteString -> UArray Word8 Int
histogram bs = runSTUArray $ do
hist <- newArray (0, 255) 0
let l = BS.length bs
update b = do
o <- unsafeRead hist b
unsafeWrite hist b (o+1)
loop i
| i < 0 = return hist
| otherwise = do
update $ fromIntegral (bs `unsafeIndex` i)
loop (i-1)
loop (l-1)
根据criterion
(建立 200000 long 的直方图ByteString
),这有很大的不同:
warming up
estimating clock resolution...
mean is 1.667687 us (320001 iterations)
found 3078 outliers among 319999 samples (1.0%)
1947 (0.6%) high severe
estimating cost of a clock call...
mean is 40.43765 ns (14 iterations)
benchmarking dflemstr
mean: 21.42852 ms, lb 21.05213 ms, ub 21.77954 ms, ci 0.950
std dev: 1.873897 ms, lb 1.719565 ms, ub 2.038779 ms, ci 0.950
variance introduced by outliers: 74.820%
variance is severely inflated by outliers
benchmarking unsafeIndex
mean: 312.6447 us, lb 304.3425 us, ub 321.0795 us, ci 0.950
std dev: 42.86886 us, lb 39.64363 us, ub 46.52899 us, ci 0.950
variance introduced by outliers: 88.342%
variance is severely inflated by outliers
(我将 dflemstr 的代码更改为也使用runSTUArray
并返回 aUArray Word8 Int
以具有 uiform 返回值,但这不会对运行时间产生很大影响。)
最有效的方法可能涉及使用可变数组来存储计数。这可能是最有效的 O(n) 解决方案之一:
import Control.Monad
import Control.Monad.ST
import Data.Array.ST
import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import Data.Word
byteHistogram :: ByteString -> [Int]
byteHistogram bs = runST $ do
histogram <- newArray (minBound, maxBound) 0 :: ST s (STUArray s Word8 Int)
forM_ (ByteString.unpack bs) $ \ byte ->
readArray histogram byte >>= return . (+1) >>= writeArray histogram byte
getElems histogram
好吧,您可以猜测,也可以编写程序并对其进行测量-结果可能会让您感到惊讶。
import Data.ByteString as B
import Data.IntMap.Strict as I
import qualified Data.Vector.Unboxed.Mutable as M
import Data.Vector.Unboxed as V
import Criterion
import Criterion.Main
import System.Entropy
import System.IO.Unsafe
import Data.Word
main = do
bs <- getEntropy 1024
defaultMain [ bench "map count" $ nf mapCount bs
, bench "map group sort" $ nf mapGroup bs
, bench "fold counters" $ nf mapFoldCtr bs
, bench "vCount" $ nf vectorCount bs
]
-- O(n*m) (length of bytestring, length of list of element being counted up)
-- My guess: bad
mapCount :: ByteString -> [Int]
mapCount bs = Prelude.map (`B.count` bs) [0x00..0xFF]
-- Notice that B.sort uses counting sort, so there's already lots of
-- duplicate work done here.
-- O() isn't such a big deal as the use of lists - likely allocation and
-- large constant factors.
mapGroup :: ByteString -> [Int]
mapGroup = Prelude.map Prelude.length . Prelude.map B.unpack . B.group . B.sort
mapFoldCtr :: ByteString -> [Int]
mapFoldCtr bs = I.elems $ B.foldl' cnt I.empty bs
where
cnt :: I.IntMap Int -> Word8 -> I.IntMap Int
cnt m k = I.insertWith (+) (fromIntegral k) 1 m
-- create (do { v <- new 2; write v 0 'a'; write v 1 'b'; return v })
vectorCount :: B.ByteString -> [Int]
vectorCount bs = V.toList $ V.create $ do
v <- M.new 256
Prelude.mapM_ (\i -> M.unsafeWrite v i 0) [0..255]
Prelude.mapM_ (\i -> M.unsafeRead v (fromIntegral i) >>= M.unsafeWrite v (fromIntegral i) . (+1) ) (B.unpack bs)
return v
结果(缩短)在地图组排序上的反映非常好,但不出所料地将未装箱的可变向量/数组样式解决方案置于领先地位:
benchmarking map count
mean: 308.7067 us, lb 307.3562 us, ub 310.5099 us, ci 0.950
std dev: 7.942305 us, lb 6.269114 us, ub 10.08334 us, ci 0.950
benchmarking map group sort
mean: 43.03601 us, lb 42.93492 us, ub 43.15815 us, ci 0.950
std dev: 567.5979 ns, lb 486.8838 ns, ub 666.0098 ns, ci 0.950
benchmarking fold counters
mean: 191.5338 us, lb 191.1102 us, ub 192.0366 us, ci 0.950
std dev: 2.370183 us, lb 1.995243 us, ub 2.907595 us, ci 0.950
benchmarking vCount
mean: 12.98727 us, lb 12.96037 us, ub 13.02261 us, ci 0.950
std dev: 156.6505 ns, lb 123.6789 ns, ub 198.4892 ns, ci 0.950
奇怪的是,当我将字节串大小增加到 200K 时,正如 Daniel 使用的那样,然后 map/group/sort 时钟在 ~250us 时,而矢量解决方案占用了 500us:
benchmarking map count
mean: 5.796340 ms, lb 5.788830 ms, ub 5.805126 ms, ci 0.950
std dev: 41.65349 us, lb 35.69293 us, ub 48.39205 us, ci 0.950
benchmarking map group sort
mean: 260.7405 us, lb 259.2525 us, ub 262.4742 us, ci 0.950
std dev: 8.247289 us, lb 7.127576 us, ub 9.371299 us, ci 0.950
benchmarking fold counters
mean: 3.915101 ms, lb 3.892415 ms, ub 4.006287 ms, ci 0.950
std dev: 201.7632 us, lb 43.13063 us, ub 469.8170 us, ci 0.950
benchmarking vCount
mean: 556.5588 us, lb 545.4895 us, ub 567.9318 us, ci 0.950
std dev: 57.44888 us, lb 51.22270 us, ub 65.91105 us, ci 0.950
found 1 outliers among 100 samples (1.0%)
variance introduced by outliers: 80.038%
variance is severely inflated by outliers
但是这种差异是巨大的——也许一些堆大小的玩会让它消失(至少在基准程序中),但对我来说不是很快或不容易。
(不要太认真)
(实际)最快的解决方案和纯 FP 解决方案是这样的......几乎:
data Hist = Hist {v00 :: Int, v01 :: Int {- , v02 :: Int, ... -} }
emptyHist :: Hist
emptyHist = Hist 0 0 {- 0 0 ... -}
foldRecord :: B.ByteString -> [Int]
foldRecord = histToList . B.foldl' cnt emptyHist
where histToList (Hist x00 x01 {- x02 ... -}) = [x00, x01 {- , x02, ... -}]
cnt (Hist !x00 !x01) 0x00 = Hist (x00 + 1) x01 {- x02 ... -}
cnt (Hist !x00 !x01) {- 0x01 -} _ = Hist x00 (x01 + 1) {- x02 ... -}
{- ... -}
使用@Thomas 基准测试运行时间为 11.67 us(之前最快vCount
在我的机器上需要 14.99 us)。
问题是 whencnt
被分成 256 种可能的模式(使用的完整等效代码lens
is here)。
编译器选择正确的模式(左侧cnt
)或递增(右侧)的速度很慢,cnt
但我认为应该生成高效的代码(至少,两种模式的效率相同)。
(使用 256 个cnt
模式和 256 个Hist
值需要 1.35 毫秒!!!)
(在我的机器上,map group sort
取 42 我们,后面vCount
替代)