5

对于仅 25mb 的文件,内存使用量恒定为 792mb!我认为这与我从列表中的使用有关,但是移动向量代码的某些部分(例如,应用 fft 的数组)并没有改变正在使用的内存量!

{-# LANGUAGE OverloadedStrings,BangPatterns #-}
import qualified Data.Attoparsec.Char8 as Ap
import Data.Attoparsec
import Control.Monad
import Control.Applicative
--import Control.DeepSeq (force)
import System.IO 
import System.Environment
import Data.List (zipWith4,unzip4,zip4,foldl')
import Data.Bits
import Data.Complex
import Data.String (fromString)
import Data.ByteString.Internal
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as Bl 
import qualified Data.Vector.Unboxed as Vu
import qualified Statistics.Transform  as St



{-
I run a test on a collection of data from a file
[(1,t),(2,t),(3,t),(4,t),(5,t)]
   -     -     - 
   |     -     -     -
   |     |     -     -     -
   |     |     |
 [y++t,  n,  y++t]
To do that, I use splitN to create a list of list
[[(1,t),(2,t),(3,t)],[(2,t),(3,t),(4,t)],[(3,t),(4,t),(5,t)]]
Map a serie of functions to determine a value for each inner collection,
and return when an event happened.



-}

data FourD b a = FourD  a a a b

instance Functor (FourD c) where  
    fmap f (FourD x y z d) = FourD  (f x) (f y) (f z) d  

mgrav_per_bit = [ 18, 36, 71, 143, 286, 571, 1142 ]
--Converting raw data to mg
aToG :: Int -> Double    
aToG a = fromIntegral . sign $  uresult 
    where   
        twocomp = if a>128
                  then 256-a
                  else a
        uresult = sum  $ zipWith (*)   mgrav_per_bit (map (fromEnum . testBit  twocomp) [0..7])
        sign = if a > 128 
               then negate 
               else id


--Data is (int,int,int,time)
--Converted to (St.CD^3,Bytestring) in place of maping afterwards.                  
parseAcc :: Parser (FourD B.ByteString St.CD)
parseAcc = do   Ap.char '('
                x <-  fmap ((:+0) . aToG) Ap.decimal  
                Ap.char ','
                y <-  fmap ((:+0) . aToG) Ap.decimal
                Ap.char ','
                z <-  fmap ((:+0) . aToG) Ap.decimal
                Ap.char ','
                time <- takeTill (== 41)
                Ap.char ')'
                return $! FourD x y z time
--applies parseAcc to many lines, fails at the end of file (Need to add a newline)
parseFile = many $ parseAcc <* (Ap.endOfInput <|> Ap.endOfLine)


readExpr input = case parse parseFile  input of
     Done b val -> val
     Partial p -> undefined
     Fail a b c -> undefined 

unType  (FourD  x y d z) = (x ,y ,d ,z)          


-- Breaks a list of FourD into smaller lists, apply f and g to those lists, then filter the result based if an even happened or not
amap  :: (Num c, Ord c) =>     ([a] -> [c]) -> ([d] -> [ByteString]) -> [FourD d a] -> [Bl.ByteString]
amap f g = (uncurry4 (zipWith4 (filterAcc))). map4 f g . unzip4 . map (unType)
    where map4 f g (a,b,c,d) = (f a,f b,f c,g d)
          uncurry4 f (a,b,c,d) = f a b c d 

-- before  i had map filterAcc,outside amap. Tried to fuse everything to eliminate intermediaries

-- An event is detected if x > 50
filterAcc  x y z t = if x > 50
                                then  (Bl.pack . B.unpack) $ "yes: " `B.append`  t  
                                else  ""
-- split [St.CD] in [(Vector St.CD)], apply fft to each, and compress to a single value. 
-- Core of the application
fftAcross :: [St.CD] -> [Int]
fftAcross = map (floor . noiseEnergy .  St.fft) . splitN 32 

-- how the value is determined (sum of all magnitudes but the first one)
noiseEnergy  :: (RealFloat a, Vu.Unbox a) => Vu.Vector (Complex a) -> a
noiseEnergy  x = (Vu.foldl' (\b a-> b+(magnitude a)) 0 (Vu.drop 1 x))/32

-- how the values are split in (Vector St.CD), if lenght > 32, takes 32, otherwhise I'm done
splitN :: Vu.Unbox a => Int -> [a] -> [Vu.Vector a]
splitN n x =  helper x 
    where
    helper x   = if     atLeast n x 
                 then   (Vu.take n (Vu.fromList x)) : (helper  (drop 1 x) )
                 else  []
-- Replacing the test by atLeast in place of a counter (that compared to length x,calculated once) reduced the behaviour that memory usage was constant.     

-- this is replicated so the behaviour of splitN happens on the time part of FourD, Can't use the same since there is no Vector Bytestring instance                
splitN2 n x =  helper x 
    where
    helper x   = if   atLeast n x 
                 then  (head   x) : (helper  (drop 1 x))
                 else  []

atLeast :: Int -> [a] -> Bool
atLeast 0 _      = True
atLeast _ []     = False
atLeast n (_:ys) = atLeast (n-1) ys



main = do    

    filename <- liftM head getArgs
    filehandle <- openFile "results.txt" WriteMode
    contents <- liftM readExpr $ B.readFile filename
    Bl.hPutStr (filehandle) .  Bl.unlines .  splitAndApplyAndFilter  $ contents where
        splitAndApplyAndFilter  = amap fftAcross (splitN2 32)  

编辑:经过一些重构,融合了一些地图,减少了长度,我设法用 25mb 的输入文件让它在 400~ 处工作。不过,在 100mb 上,它需要 1.5gb。

该程序旨在确定某个事件是否在某个时间点发生,因为它需要一组值(我使用 32 atm),在其中运行 fft,对这些值求和并查看是否超过阈值。如果是,则将时间打印到文件中。

http://db.tt/fT8kXPKz用于 25mb 测试文件

4

1 回答 1

3

由于reddit中关于相同问题的主题,我找到了解决方案! 使用 Haskell 和 Attoparsec 进行解析

我的大部分问题是由于 attoparsec 是严格的并且 haskell 数据相当大的事实引起的(因此 100mb 的文本文件实际上可以在运行时更多)

另一半是分析使内存使用加倍,我没有考虑到这一点。

将解析器更改为惰性后,我的程序使用 120mb 代替 800mb(当输入大小为 116mb 时),所以成功了!

如果有人对此感兴趣,这里是相关的代码更改:

readExpr input = case parse (parseAcc<*(Ap.endOfLine<*Ap.endOfInput<|>Ap.endOfLine)) input of
     Done b val -> val : readExpr b
     Partial  e -> []
     Fail _ _ c -> error c 

完整代码:

{-# LANGUAGE OverloadedStrings,BangPatterns #-}
import qualified Data.Attoparsec.Char8 as Ap
import Data.Attoparsec
import Control.Monad
import Control.Applicative
--import Control.DeepSeq (force)
import System.IO 
import System.Environment
import Data.List (zipWith4,unzip4,zip4,foldl')
import Data.Bits
import Data.Complex
import Data.String (fromString)
import Data.ByteString.Internal
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as Bl 
import qualified Data.Vector.Unboxed as Vu
import qualified Statistics.Transform  as St


{-
I run a test on a collection of data from a file
[(1,t),(2,t),(3,t),(4,t),(5,t)]
   -     -     - 
   |     -     -     -
   |     |     -     -     -
   |     |     |
 [y++t,  n,  y++t]
To do that, I use splitN to create a list of list
[[(1,t),(2,t),(3,t)],[(2,t),(3,t),(4,t)],[(3,t),(4,t),(5,t)]]
Map a serie of functions to determine a value for each inner collection,
and return when an event happened.



-}

data FourD b a = FourD  a a a b

instance Functor (FourD c) where  
    fmap f (FourD x y z d) = FourD  (f x) (f y) (f z) d  

mgrav_per_bit = [ 18, 36, 71, 143, 286, 571, 1142 ]
--Converting raw data to mg
aToG :: Int -> Double    
aToG a = fromIntegral . sign $  uresult 
    where   
        twocomp 
            | a>128     = 256-a
            | otherwise =     a
        uresult = sum  $ zipWith (*)   mgrav_per_bit (map (fromEnum . testBit  twocomp) [0..7])
        sign 
            | a > 128   = negate
            | otherwise =     id


--Data is (int,int,int,time)
--Converted to (St.CD^3,Bytestring) in place of maping afterwards.                  
parseAcc :: Parser (FourD B.ByteString St.CD)
parseAcc = do   Ap.char '('
                x <-  fmap ((:+0) . aToG) Ap.decimal  -- Parse, transform to mg, convert to complex
                Ap.char ','
                y <-  fmap ((:+0) . aToG) Ap.decimal
                Ap.char ','
                z <-  fmap ((:+0) . aToG) Ap.decimal
                Ap.char ','
                time <- takeTill (== 41)
                Ap.char ')'
                return $! FourD x y z time
--applies parseAcc to many lines, fails at the end of file (Need to add a newline)
parseFile = many $ parseAcc <* (Ap.endOfInput <|> Ap.endOfLine)


readExpr input = case parse (parseAcc<*(Ap.endOfLine<*Ap.endOfInput<|>Ap.endOfLine)) input of
     Done b val -> val : readExpr b
     Partial  e -> []
     Fail _ _ c -> error c 

unType  (FourD  x y d z) = (x ,y ,d ,z)          


-- Breaks a list of FourD into smaller lists, apply f and g to those lists, then filter the result based if an even happened or not
amap  :: (Num c, Ord c) =>     ([a] -> [c]) -> ([d] -> [ByteString]) -> [FourD d a] -> [ByteString]
amap f g = (uncurry4 (zipWith4 (filterAcc))). map4 f g . unzip4 . map (unType)
    where map4 f g (a,b,c,d) = (f a,f b,f c,g d)
          uncurry4 f (a,b,c,d) = f a b c d 

-- before  i had map filterAcc,outside amap. Tried to fuse everything to eliminate intermediaries

-- An event is detected if x > 50
filterAcc  x y z t 
              | x > 50    = t
              | otherwise = ""

-- split [St.CD] in [(Vector St.CD)], apply fft to each, and compress to a single value. 
-- Core of the application
fftAcross :: [St.CD] -> [Int]
fftAcross = map (floor . noiseEnergy .  St.fft) . splitN 32 


-- how the value is determined (sum of all magnitudes but the first one)
noiseEnergy  :: (RealFloat a, Vu.Unbox a) => Vu.Vector (Complex a) -> a
noiseEnergy  x = (Vu.foldl' (\b a-> b+(magnitude a)) 0 (Vu.drop 1 x))/32


-- how the values are split in (Vector St.CD), if lenght > 32, takes 32, otherwhise I'm done
splitN :: Vu.Unbox a => Int -> [a] -> [Vu.Vector a]
splitN n x =  helper x 
    where
    helper x   
            | atLeast n x = (Vu.take n (Vu.fromList x)) : (helper  (drop 1 x) )
            | otherwise   = []

-- Replacing the test by atLeast in place of a counter (that compared to length x,calculated once) reduced the behaviour that memory usage was constant.     

-- this is replicated so the behaviour of splitN happens on the time part of FourD, Can't use the same since there is no Vector Bytestring instance                
splitN2 n x =  helper x 
    where
    helper x   
            | atLeast n x = (head   x) : (helper  (drop 1 x))
            | otherwise   = []

atLeast :: Int -> [a] -> Bool
atLeast 0 _      = True
atLeast _ []     = False
atLeast n (_:ys) = atLeast (n-1) ys

intervalFinder :: [ByteString]->[B.ByteString]
intervalFinder x = helper x ""
    where
    helper (x:xs) "" 
        | x /= ""   = ("Start Time: " `B.append` x `B.append` "\n"):(helper xs x)
        | otherwise = helper xs ""
    helper (x:xs) y
        | x == ""   = ( "End   Time: "`B.append`  y `B.append` "\n\n" ):(helper xs "")
        | otherwise = helper xs x
    helper _ _      = []

main = do
    filename <- liftM head getArgs
    filehandle <- openFile "results.txt" WriteMode
    contents <- liftM readExpr $ B.readFile filename
    Bl.hPutStr (filehandle) .  Bl.fromChunks . intervalFinder . splitAndApplyAndFilter  $ contents 
    hClose filehandle
    where
         splitAndApplyAndFilter  = amap fftAcross (splitN2 32)  





    --contents <- liftM ((map ( readExpr )) . B.lines) $ B.readFile filename


   {-     *Main> let g = liftM ((amap fftAcross (splitN2 32)) . readExpr) $ B.readFile "te
stpattern2.txt"
-}

   -- B.hPutStrLn (filehandle)  . B.unlines . map (B.pack . show ) .  amap (map (floor .quare) .  (filter (/=[])) . map ( (drop 1) . (map (/32)) . fft ) . splitN 32) . map ( fmap(fromIntegral . aToG)) . map readExpr $ contents
于 2012-08-31T06:36:06.967 回答