6

我想使用xml-conduit,特别Text.XML.Stream.Parse是为了懒惰地从大型 XML 文件中提取对象列表。

作为测试用例,我使用了最近重新发布的 StackOverflow 数据转储。为简单起见,我打算从中提取所有用户名stackoverflow.com-Users.7z。即使文件是 a .7zfile也表示它只是 bzip2 压缩的数据(文件末尾可能有一些 7zip 的东西,但现在我不在乎)。

XML 的简化版本是

<users>
    <row id="1" DisplayName="StackOverflow"/>
    ...
    <row id="2597135" DisplayName="Uli Köhler"/>
    ... 
</users>

基于这个之前的问答和Hackage的示例,以 bz2-ed 形式读取示例 XML 对我来说非常适合

但是,当runghc用于运行以下程序时,它运行时不打印任何输出:

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit (runResourceT, ($$), ($=))
import qualified Data.Conduit.Binary as CB
import Data.Conduit.BZlib
import Data.Conduit
import Data.Text (Text)
import System.IO
import Text.XML.Stream.Parse
import Control.Applicative ((<*))

data User = User {name :: Text} deriving (Show)

parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers = tagNoAttr "users" $ many parseUserRow

main = do
    users <- runResourceT $ CB.sourceFile "stackoverflow.com-Users.7z" $= bunzip2 $= parseBytes def $$ force "users required" parseUsers
    putStrLn $ unlines $ map show users

我认为出现此问题是因为 Haskell 尝试users在开始打印之前深入评估列表。程序的内存使用量持续增长约每秒 2%(来源:htop)支持这一理论。

如何将结果“直播”到标准输出?我认为这可以通过$$ CB.sinkFile "output.txt"在末尾添加另一个管道语句来实现。但是,此特定版本期望Conduit输出ByteString. 你能指出我从这里去哪里的正确方向吗?

任何帮助将不胜感激!

4

3 回答 3

10

让我首先说 xml-conduit 中的流式帮助程序 API 已经多年没有工作了,并且可能会受益于重新构想在此期间发生在管道上的更改。我认为可能有更好的方法来完成事情。

就是说,让我解释一下您遇到的问题。该many函数创建一个结果列表,并且在完成处理之前不会产生任何值。在您的情况下,有这么多的价值观,这似乎永远不会发生。最终,当整个文件被读取时,整个用户列表将立即显示出来。但这显然不是您要寻找的行为。

相反,您想要做的是创建一个值User这些值一旦准备好就会生成。你想要做的基本上是many用一个新函数替换函数调用,yield每次解析它都会产生结果。一个简单的实现可能是:

yieldWhileJust :: Monad m
               => ConduitM a b m (Maybe b)
               -> Conduit a m b
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

此外,putStrLn $ unlines $ map show您希望将整个管道附加到消费者,而不是使用,消费者将打印每个单独产生的User值。这可以通过 轻松实现Data.Conduit.List.mapM_,例如:CL.mapM_ (liftIO . print)

我已经根据您的代码整理了一个完整的示例。输入是一个人工生成的无限 XML 文件,只是为了证明它确实是立即产生输出。

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import           Control.Applicative    ((<*))
import           Control.Concurrent     (threadDelay)
import           Control.Monad          (forever, void)
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Data.ByteString        (ByteString)
import           Data.Conduit
import qualified Data.Conduit.List      as CL
import           Data.Text              (Text)
import           Data.Text.Encoding     (encodeUtf8)
import           Data.XML.Types         (Event)
import           Text.XML.Stream.Parse

-- instead of actually including a large input data file, just for testing purposes
infiniteInput :: MonadIO m => Source m ByteString
infiniteInput = do
    yield "<users>"
    forever $ do
        yield $ encodeUtf8
            "<row id=\"1\" DisplayName=\"StackOverflow\"/><row id=\"2597135\" DisplayName=\"Uli Köhler\"/>"
        liftIO $ threadDelay 1000000
    --yield "</users>" -- will never be reached

data User = User {name :: Text} deriving (Show)

parseUserRow :: MonadThrow m => Consumer Event m (Maybe User)
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers :: MonadThrow m => Conduit Event m User
parseUsers = void $ tagNoAttr "users" $ yieldWhileJust parseUserRow

yieldWhileJust :: Monad m
               => ConduitM a b m (Maybe b)
               -> Conduit a m b
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

main :: IO ()
main = infiniteInput
    $$ parseBytes def
    =$ parseUsers
    =$ CL.mapM_ print
于 2014-01-27T07:46:48.870 回答
2

基于Michael Snoyman 的出色回答stackoverflow.com-Users.7z,这里是一个修改版本,它从人工生成的 IO 流中读取数据,而不是从其中获取数据。

有关如何xml-conduit直接使用的参考,请参阅Michael 的回答。此答案仅作为示例提供,说明如何在可选压缩文件上使用此处描述的方法。

这里主要的变化是需要使用runResourceT来读取文件,最终print需要lifted from IO ()toResourceT IO ()

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import qualified Data.Conduit.Binary as CB
import           Control.Applicative    ((<*))
import           Control.Concurrent     (threadDelay)
import           Control.Monad          (forever, void)
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Data.ByteString        (ByteString)
import qualified Data.ByteString.Lazy as LB
import           Data.Conduit
import qualified Data.Conduit.List      as CL
import           Data.Text              (Text)
import           Data.Text.Encoding     (encodeUtf8)
import           Data.XML.Types         (Event)
import           Text.XML.Stream.Parse
import           Data.Conduit.BZlib (bunzip2)
import           Control.Monad.Trans.Class (lift)
import           Control.Monad.Trans.Resource (MonadThrow, runResourceT)

data User = User {name :: Text} deriving (Show)

parseUserRow :: MonadThrow m => Consumer Event m (Maybe User)
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers :: MonadThrow m => Conduit Event m User
parseUsers = void $ tagNoAttr "users" $ yieldWhileJust parseUserRow

yieldWhileJust :: Monad m
               => ConduitM a b m (Maybe b)
               -> Conduit a m b
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

main :: IO ()
main = runResourceT $ CB.sourceFile "stackoverflow.com-Users.7z" $= bunzip2 $$ parseBytes def
    =$ parseUsers
    =$ CL.mapM_ (lift . print)
于 2014-01-29T03:03:08.693 回答
2

进行了编辑以更新M. Snoyman的富有洞察力的示例,但它被平庸的权力绊脚石所抛弃。因此,这。

原版将不再编译并产生许多已弃用的警告(遗留语法)。

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import           Control.Applicative    ((<*))
import           Control.Concurrent     (threadDelay)
import           Control.Monad          (forever, void)
import           Control.Monad.Catch    (MonadThrow)
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Data.ByteString        (ByteString)
import           Data.Conduit
import qualified Data.Conduit.List      as CL
import           Data.Text              (Text)
import           Data.Text.Encoding     (encodeUtf8)
import           Data.XML.Types         (Event)
import           Text.XML.Stream.Parse

-- instead of actually including a large input data file, just for testing purposes
infiniteInput :: MonadIO m => ConduitT () ByteString m ()
infiniteInput = do
    yield "<users>"
    forever $ do
        yield $ encodeUtf8
            "<row id=\"1\" DisplayName=\"StackOverflow\"/><row id=\"2597135\" DisplayName=\"Uli Köhler\"/>"
        liftIO $ threadDelay 1000000
    --yield "</users>" -- will never be reached

data User = User {name :: Text} deriving (Show)

parseUserRow :: MonadThrow m => forall o. ConduitT Event o m (Maybe User)
parseUserRow = tag' "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers :: MonadThrow m => ConduitT Event User m ()
parseUsers = void $ tagNoAttr "users" $ manyYield parseUserRow

--or use manyYield, now provided by Text.XML.Stream.Parse
yieldWhileJust :: Monad m
               => ConduitT a b m (Maybe b)
               -> ConduitT a b m ()
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

main :: IO ()
main = runConduit $ infiniteInput
    .| parseBytes def
    .| parseUsers
    .| CL.mapM_ print

ghc 8.6.5,xml 管道 1.9.0.0

于 2020-01-10T13:21:07.043 回答