2

我应该先说我是 Haskell 和管道库的初学者,我想了解是什么导致该程序在test函数中的高内存使用。

特别是在产生r1价值的折叠中,test我看到 MyRecord 值的积累,直到产生最终结果,除非deepseq使用。在我的 ~ 500000 行 / ~ 230 MB 的样本数据集上,内存使用量超过 1.5 GB。

产生r2价值的折叠在恒定内存中运行。

我想了解的是:

1) 什么可能导致 MyMemory 值在第一折中构建,为什么使用deepseq会修复它?我非常随意地向它扔东西,直到达到使用deepseq来实现恒定的内存使用,但想了解它为什么起作用。是否可以在不使用的情况下实现恒定的内存使用deepseq,同时仍然产生相同的 Maybe Int 结果类型?

2)。第二折有什么不同导致它不表现出相同的问题?

我知道如果我只使用整数而不是元组,我可以使用sumPipes.Prelude 中的内置函数,但我最终会想要处理包含任何解析错误的第二个元素。

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Test where

import           Control.Arrow
import           Control.DeepSeq
import           Control.Monad
import           Data.Aeson
import           Data.Function
import           Data.Maybe
import           Data.Monoid
import           Data.Text (Text)

import           Pipes
import qualified Pipes.Aeson as PA (DecodingError(..))
import qualified Pipes.Aeson.Unchecked as PA
import qualified Pipes.ByteString as PB
import qualified Pipes.Group as PG
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as P

import           System.IO
import           Control.Lens
import qualified Control.Foldl as Fold

data MyRecord = MyRecord
  { myRecordField1 :: !Text
  , myRecordField2 :: !Int
  , myRecordField3 :: !Text
  , myRecordField4 :: !Text
  , myRecordField5 :: !Text
  , myRecordField6 :: !Text
  , myRecordField7 :: !Text
  , myRecordField8 :: !Text
  , myRecordField9 :: !Text
  , myRecordField10 :: !Int
  , myRecordField11 :: !Text
  , myRecordField12 :: !Text
  , myRecordField13 :: !Text
  } deriving (Eq, Show)

instance FromJSON MyRecord where
  parseJSON (Object o) =
    MyRecord <$> o .: "field1" <*> o .: "field2" <*> o .: "field3" <*>
    o .: "field4" <*>
    o .: "field5" <*>
    o .: "filed6" <*>
    o .: "field7" <*>
    o .: "field8" <*>
    o .: "field9" <*>
    (read <$> o .: "field10") <*>
    o .: "field11" <*>
    o .: "field12" <*>
    o .: "field13"
  parseJSON x = fail $ "MyRecord: expected Object, got: " <> show x

instance ToJSON MyRecord where
    toJSON _ = undefined

test :: IO ()
test = do
  withFile "some-file" ReadMode $ \hIn
  {-

      the pipeline is composed as follows:

      1 a producer reading a file with Pipes.ByteString, splitting chunks into lines,
        and parsing the lines as JSON to produce tuples of (Maybe MyRecord, Maybe
        ByteString), the second element being an error if parsing failed

      2 a pipe filtering that tuple on a field of Maybe MyRecord, passing matching
        (Maybe MyRecord, Maybe ByteString) downstream

      3 and a pipe that picks an Int field out of Maybe MyRecord, passing (Maybe Int,
        Maybe ByteString downstream)

      pipeline == 1 >-> 2 >-> 3

      memory profiling indicates the memory build up is due to accumulation of
      MyRecord "objects", and data types comprising their fields (mainly
      Text/ARR_WORDS)

  -}
   -> do
    let pipeline = f1 hIn >-> f2 >-> f3
    -- need to use deepseq to avoid leaking memory
    r1 <-
      P.fold
        (\acc (v, _) -> (+) <$> acc `deepseq` acc <*> pure (fromMaybe 0 v))
        (Just 0)
        id
        (pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
    print r1
    hSeek hIn AbsoluteSeek 0
    -- this works just fine as is and streams in constant memory
    r2 <-
      P.fold
        (\acc v ->
           case fst v of
             Just x -> acc + x
             Nothing -> acc)
        0
        id
        (pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
    print r2
    return ()
  return ()

f1
  :: (FromJSON a, MonadIO m)
  => Handle -> Producer (Maybe a, Maybe PB.ByteString) m ()
f1 hIn = PB.fromHandle hIn & asLines & resumingParser PA.decode

f2
  :: Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) IO r
f2 = filterRecords (("some value" ==) . myRecordField5)

f3 :: Pipe (Maybe MyRecord, d) (Maybe Int, d) IO r
f3 = P.map (first (fmap myRecordField10))

filterRecords
  :: Monad m
  => (MyRecord -> Bool)
  -> Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) m r
filterRecords predicate =
  for cat $ \(l, e) ->
    when (isNothing l || (predicate <$> l) == Just True) $ yield (l, e)

asLines
  :: Monad m
  => Producer PB.ByteString m x -> Producer PB.ByteString m x
asLines p = Fold.purely PG.folds Fold.mconcat (view PB.lines p)

parseRecords
  :: (Monad m, FromJSON a, ToJSON a)
  => Producer PB.ByteString m r
  -> Producer a m (Either (PA.DecodingError, Producer PB.ByteString m r) r)
parseRecords = view PA.decoded

resumingParser
  :: Monad m
  => PP.StateT (Producer a m r) m (Maybe (Either e b))
  -> Producer a m r
  -> Producer (Maybe b, Maybe a) m ()
resumingParser parser p = do
  (x, p') <- lift $ PP.runStateT parser p
  case x of
    Nothing -> return ()
    Just (Left _) -> do
      (x', p'') <- lift $ PP.runStateT PP.draw p'
      yield (Nothing, x')
      resumingParser parser p''
    Just (Right b) -> do
      yield (Just b, Nothing)
      resumingParser parser p'
4

1 回答 1

4

文档中Pipes.foldl所述,折叠是严格的。然而,严格执行$!只强制评估 WHNF - 弱头范式。WHNF 足以完全评估像 Int 这样的简单类型,但不足以完全评估像 a 这样的更复杂类型Maybe Int

一些例子:

main1 = do
  let a = 3 + undefined
      b = seq a 10
  print b                -- error: Exception: Prelude.undefined

main2 = do
  let a = Just (3 + undefined)
      b = seq a 10
  print b                -- no exception

在第一种情况下,变量accJust一个大的 thunk - 所有元素的总和。在每次迭代中,变量accJust aJust (a+b)Just (a+b+c)等。在折叠期间不会执行加法 - 它只是在最后完成。大量内存使用来自将这个不断增长的总和存储在内存中。

在第二种情况下,每次迭代将总和减少$!为一个简单的 Int。

除了使用deepseq,您还可以使用force

force x = x `deepseq` x

并且如deepseq 文档中所述,结合 ViewPatterns 您可以创建一个完全评估函数参数的模式:

{-# LANGUAGE ViewPatterns #-}

...
P.fold
  (\(force -> !acc) (v,_) -> (+) <$> acc <*> pure (fromMaybe 0 v))
  (Just 0)
  ...
于 2016-09-06T19:01:19.153 回答