3

我有一些 ASCII 文件,总共包含大约 1700 万行,每行/大多数行中有一个固定的 36 字节标识符。所以我的数据是矩形的:我有很多固定宽度的行。使用 Haskell,我想读取所有行,使用正则表达式提取标识符(我很好),然后对它们进行排序并计算唯一标识符的数量(非常接近grep | sort | uniq)。(我已经通过并行读取每个文件来进行并行化。)听起来像一个简单的问题,但是......

我发现即使在排序阶段之前,也很难从这个问题中获得体面的表现。这是我所知道的。 String对于 36 字节的 ASCII 来说太过分了,所以我想使用ByteString. 但是大小为 1700 万的(链接)列表似乎是个坏主意,所以我尝试了IOVector ByteString. 但这似乎很慢。我相信垃圾收集正在遭受痛苦,因为我在向量中保留了数百万个小字节字符串:GC 花费的时间至少是代码的 3 倍(根据+RTS -s),我认为随着程序继续运行它只会变得更糟。

我在想我应该使用 Repa 或某种单一的巨型ByteString// IOVector Char8whatever(因为我知道每行的确切宽度是 36)为每个线程将数据存储在一个巨大的矩形数组中,这应该可以缓解 GC 问题. 但是,之后我仍然需要对行进行排序,并且 Repa 似乎不支持排序,我不想自己编写排序算法。所以我不知道如何拥有一个巨大的矩形阵列,但仍然对其进行排序。

对要使用的库、要设置的 GC 标志或其他什么的建议?这台机器有 24 个内核和 24GB 的 RAM,所以我不受硬件限制。我想留在 Haskell 中,因为我有很多相关的代码(也用于解析相同的文件并生成摘要统计信息)工作正常,而且我不想重写它。

4

5 回答 5

1

我相信垃圾收集正在遭受痛苦,因为我在向量中保留了数百万个小字节字符串

可疑的。不应收集保留的字节字符串。也许您的代码中某处存在过多的数据复制?

  • ByteString是一个标头(8 个字节)+ ForeignPtr Word8ref(8 个字节)+Int偏移量(4 个字节)+Int长度(4 个字节)

  • ForeignPtr是一个标头(8 个字节)+ Addr#(8 个字节)+ PlainPtrref(8 个字节)

  • PlainPtr是一个标头(8 个字节)+ MutableByteArray#ref(8 个字节)

(根据https://stackoverflow.com/a/3256825/648955修改)

总而言之,ByteString开销至少是64 个字节(对我来说,某些字段是共享的)。

编写自己的数据管理:大平面Word8数组和临时偏移包装器

newtype ByteId = ByteId { offset :: Word64 }

Ord实例。

每个标识符的开销为8 个字节。将偏移存储在 unboxed 中Vector。用这样的东西排序:http ://hackage.haskell.org/packages/archive/vector-algorithms/0.5.4.2/doc/html/Data-Vector-Algorithms-Intro.html#v:sort

于 2013-08-22T15:12:04.083 回答
0

Array类型系列具有对多维数组的内置支持。索引可以是任何带有Ix实例的东西,特别是对于您的 case (Int, Int)。不幸的是,它也不支持排序。

但是对于您的用例,您真的需要排序吗?如果您有一个从标识符到 Int 的映射,您可以随时增加计数,然后选择值为 1 的所有键。您可以查看该bytestring-trie包,尽管对于您的用例,它建议使用hashmap.

另一种算法是携带两个集合(例如 HashSet),一个带有只看到一次的标识符,一个带有多次看到的标识符,并且您在遍历列表时更新这些集合。

另外,你如何读取你的文件:如果你把它作为一个大的 ByteString 读取并仔细地从中构造小的 ByteString 对象,它们实际上只是指向大文件的大块内存的指针,可能会消除你的 GC 问题。但是要评估我们需要查看您的代码。

于 2013-08-22T14:16:36.940 回答
0

尝试应该工作。在具有 4 gig RAM 的双核笔记本电脑上,这段代码需要 45 分钟处理 1800 万行、600 万个唯一键的文件:

--invoked:  test.exe +RTS -K3.9G -c -h
import qualified Data.ByteString.Char8 as B
import qualified Data.Trie as T

file = "data.txt"
main = ret >>= print
ret = do  -- retreive the data
    ls <- B.readFile file >>= return.B.lines
    trie <- return $ tupleUp ls
    return $ T.size trie 
tupleUp:: [B.ByteString] -> T.Trie Int
tupleUp l = foldl f T.empty l
f acc str = case T.lookup str acc 
            of Nothing -> T.insert str 1 acc
               Just n ->  T.adjust (+1) str acc

这是用于生成数据文件的代码(6MM 密钥,然后 3 份复制到 1 个文件中以获得 18MM 密钥:

import qualified Data.ByteString.Char8 as BS
import System.Random
import Data.List.Split

file = "data.txt"
numLines = 6e6 --17e6
chunkSize = 36
charSet = ['a'..'z'] ++ ['A'..'Z'] ++ ['0'..'9']

-- generate the file
gen = do
  randgen <- getStdGen
  dat <- return $ t randgen
  writeFile file (unlines dat)

t gen = take (ceiling numLines) $ charChunks
    where
      charChunks = chunksOf chunkSize chars
      chars = map (charSet!!) rands
      rands = randomRs (0,(length charSet) -1) gen

main = gen
于 2013-08-23T19:35:44.037 回答
0

那么,我们能做到多快呢?让我们用@ja. 的代码生成的文件做一些测试:

cat data.txt > /dev/null
  --> 0.17 seconds

在 Haskell 中也一样吗?

import qualified Data.ByteString as B

f = id

main = B.readFile "data.txt" >>= return . f >>= B.putStr

定时?

time ./Test > /dev/null
  --> 0.32 seconds

需要两倍的时间,但我想这还不错。使用严格的字节串,因为我们想在一秒钟内将其分块。

接下来,我们可以使用Vector还是太慢了?让我们构建一个Vector块并将它们重新组合在一起。我使用blaze-builder包来优化输出。

import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import qualified Data.Vector as V
import qualified Blaze.ByteString.Builder as BB
import Data.Monoid

recordLen = 36
lineEndingLen = 2 -- Windows! change to 1 for Unix
numRecords = (`div` (recordLen + lineEndingLen)) . B.length 

substr idx len = B.take len . B.drop idx
recordByIdx idx = substr (idx*(recordLen+lineEndingLen)) recordLen

mkVector :: B.ByteString -> V.Vector (B.ByteString)
mkVector bs = V.generate (numRecords bs) (\i -> recordByIdx i bs)

mkBS :: V.Vector (B.ByteString) -> L.ByteString
mkBS = BB.toLazyByteString . V.foldr foldToBS mempty
     where foldToBS :: B.ByteString -> BB.Builder -> BB.Builder
           foldToBS = mappend . BB.fromWrite . BB.writeByteString

main = B.readFile "data.txt" >>= return . mkBS . mkVector >>= L.putStr

多久时间?

time ./Test2 > /dev/null
  --> 1.06 seconds

一点也不差!即使您使用正则表达式而不是我的固定块位置来读取行,我们仍然可以得出结论,您可以将块放入 aVector中而不会严重影响性能。

还剩下什么?排序。理论上,桶排序应该是此类问题的理想算法。我尝试自己实现一个:

import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import qualified Data.Vector as V
import qualified Data.Vector.Generic.Mutable as MV
import qualified Blaze.ByteString.Builder as BB
import Data.Monoid
import Control.Monad.ST
import Control.Monad.Primitive

recordLen = 36
lineEndingLen = 2 -- Windows! change to 1 for Unix
numRecords = (`div` (recordLen + lineEndingLen)) . B.length 

substr idx len = B.take len . B.drop idx
recordByIdx idx = substr (idx*(recordLen+lineEndingLen)) (recordLen+lineEndingLen)

mkVector :: B.ByteString -> V.Vector (B.ByteString)
mkVector bs = V.generate (numRecords bs) (\i -> recordByIdx i bs)

mkBS :: V.Vector (B.ByteString) -> L.ByteString
mkBS = BB.toLazyByteString . V.foldr foldToBS mempty
     where foldToBS :: B.ByteString -> BB.Builder -> BB.Builder
           foldToBS = mappend . BB.fromWrite . BB.writeByteString

bucketSort :: Int -> V.Vector B.ByteString -> V.Vector B.ByteString
bucketSort chunkSize v = runST $ emptyBuckets >>= \bs -> 
                                 go v bs lastIdx (chunkSize - 1)
           where lastIdx = V.length v - 1

                 emptyBuckets :: ST s (V.MVector (PrimState (ST s)) [B.ByteString])
                 emptyBuckets = V.thaw $ V.generate 256 (const [])

                 go :: V.Vector B.ByteString -> 
                       V.MVector (PrimState (ST s)) [B.ByteString] -> 
                       Int -> Int -> ST s (V.Vector B.ByteString)
                 go v _ _ (-1) = return v
                 go _ buckets (-1) testIdx = do
                    v' <- unbucket buckets
                    bs <- emptyBuckets
                    go v' bs lastIdx (testIdx - 1)
                 go v buckets idx testIdx = do
                    let testChunk = v V.! idx
                        testByte = fromIntegral $ testChunk `B.index` testIdx
                    b <- MV.read buckets testByte
                    MV.write buckets testByte (testChunk:b)
                    go v buckets (idx-1) testIdx

                 unbucket :: V.MVector (PrimState (ST s)) [B.ByteString] -> 
                             ST s (V.Vector B.ByteString)
                 unbucket v = do 
                          v' <- V.freeze v
                          return . V.fromList . concat . V.toList $ v'

main = B.readFile "data.txt" >>= return . process >>= L.putStr
     where process =  mkBS . 
                      bucketSort (recordLen) . 
                      mkVector

测试它的时间约为 1:50 分钟,这可能是可以接受的。我们谈论的是一个 O(c*n) 算法,n 范围为几百万,常数 c 为 36*something。但我相信你可以进一步优化它。

或者你可以只使用这个vector-algorithms包。使用堆排序进行测试:

import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import qualified Data.Vector as V
import qualified Blaze.ByteString.Builder as BB
import Data.Vector.Algorithms.Heap (sort)
import Data.Monoid
import Control.Monad.ST

recordLen = 36
lineEndingLen = 2 -- Windows! change to 1 for Unix
numRecords = (`div` (recordLen + lineEndingLen)) . B.length 

substr idx len = B.take len . B.drop idx
recordByIdx idx = substr (idx*(recordLen+lineEndingLen)) (recordLen+lineEndingLen)

mkVector :: B.ByteString -> V.Vector (B.ByteString)
mkVector bs = V.generate (numRecords bs) (\i -> recordByIdx i bs)

mkBS :: V.Vector (B.ByteString) -> L.ByteString
mkBS = BB.toLazyByteString . V.foldr foldToBS mempty
     where foldToBS :: B.ByteString -> BB.Builder -> BB.Builder
           foldToBS = mappend . BB.fromWrite . BB.writeByteString

sortIt v = runST $ do 
       mv <- V.thaw v
       sort mv
       V.freeze mv

main = B.readFile "data.txt" >>= return . process >>= L.putStr
     where process =  mkBS . 
                      sortIt .
                      mkVector

这在我的机器上大约 1:20 分钟内完成了这项工作,所以现在它比我的桶排序更快。两种最终解决方案都消耗 1-1.2 GB RAM。

够好了?

于 2013-08-24T09:58:01.347 回答
0

有几个围绕 mmap 的包装器可用,它们可以为您提供文件中数据的 Ptrs 或大的 ByteString。ByteString 实际上只是一个指针、偏移量、长度元组;将那个大的 ByteString 拆分成一堆小的实际上只是创建了一堆指向大字节子集的新元组。由于您说每条记录在文件中的固定偏移量处,您应该能够创建一堆新记录,而无需通过 ByteString 实际访问任何文件take

我对问题的排序部分没有任何好的建议,但首先避免复制文件数据应该是一个好的开始。

于 2013-08-22T15:55:22.093 回答