4

我一直在尝试新的管道 http 包,我有一个想法。我有两个用于网页的解析器,一个返回行项目,另一个返回页面其他地方的数字。当我抓取页面时,最好将这些解析器串在一起并从同一个字节串生产者同时获取它们的结果,而不是两次获取页面或将所有 html 提取到内存中并解析两次。

换句话说,假设您有两个消费者:

c1 :: Consumer a m r1
c2 :: Consumer a m r2

是否可以制作这样的功能:

combineConsumers :: Consumer a m r1 -> Consumer a m r2 -> Consumer a m (r1, r2)
combineConsumers = undefined

我已经尝试了几件事,但我无法弄清楚。我知道如果不可能,但这会很方便。

编辑:

很抱歉,由于我对管道-attoparsec 的经验导致我提出了错误的问题,因此我对管道-attoparsec 做出了假设。当我假设它会返回一个管道使用者时,管道-attoparsec 将一个 attoparsec 转换为一个管道解析器。这意味着我实际上不能将两个 attoparsec 解析器转换为接受文本并返回结果的消费者,然后将它们与普通的旧管道生态系统一起使用。对不起,我只是不明白管道解析。

尽管这对我没有帮助,但 Arthur 的答案与我提出这个问题时所设想的差不多,而且我将来可能最终会使用他的解决方案。与此同时,我将使用导管。

4

5 回答 5

3

如果结果是“单曲线的”,您可以使用teePipes prelude 中的函数,结合WriterT.

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import Control.Monad
import Control.Monad.Writer
import Control.Monad.Writer.Class
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.Text as T

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

counter :: Monoid w => T.Text 
                    -> (T.Text -> w) 
                    -> Consumer T.Text (WriterT w IO) ()
counter word inject = P.filter (==word) >-> P.mapM (tell . inject) >-> P.drain

main :: IO ()
main = do
    result <-runWriterT $ runEffect $ 
        hoist lift textSource >-> 
        P.tee (counter "foo" inject1) >-> (counter "bar" inject2)
    putStrLn . show $ result
    where
    inject1 _ = (,) (Sum 1) mempty
    inject2 _ = (,) mempty (Sum 1)

更新:正如评论中提到的,我看到的真正问题是pipes解析器不是Consumers. 如果两个解析器对剩余部分有不同的行为,你怎么能同时运行它们呢?如果其中一个解析器想要“取消绘制”某些文本而另一个解析器不想要,会发​​生什么?

一种可能的解决方案是以真正并发的方式在不同的线程中运行解析器。包中的原语pipes-concurrency允许您Producer通过将相同的数据写入两个不同的邮箱来“复制” a。然后每个解析器可以用自己的生产者副本做任何事情。这是一个也使用pipes-parse,pipes-attoparsecasync包的示例:

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import qualified Data.Text as T
import Data.Attoparsec.Text hiding (takeWhile)
import Data.Attoparsec.Combinator
import Control.Applicative
import Control.Monad
import Control.Monad.State.Strict
import Pipes
import qualified Pipes.Prelude as P
import qualified Pipes.Attoparsec as P
import qualified Pipes.Concurrent as P
import qualified Control.Concurrent.Async as A

parseChars :: Char -> Parser [Char] 
parseChars c = fmap mconcat $ 
    many (notChar c) *> many1 (some (char c) <* many (notChar c))

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

parseConc :: Producer T.Text IO () 
          -> Parser a 
          -> Parser b 
          -> IO (Either P.ParsingError a,Either P.ParsingError b)
parseConc producer parser1 parser2 = do
    (outbox1,inbox1,seal1) <- P.spawn' P.Unbounded
    (outbox2,inbox2,seal2) <- P.spawn' P.Unbounded
    feeding <- A.async $ runEffect $ producer >-> P.tee (P.toOutput outbox1) 
                                              >->        P.toOutput outbox2
    sealing <- A.async $ A.wait feeding >> P.atomically seal1 >> P.atomically seal2
    r <- A.runConcurrently $ 
        (,) <$> (A.Concurrently $ parseInbox parser1 inbox1)
            <*> (A.Concurrently $ parseInbox parser2 inbox2)
    A.wait sealing
    return r 
    where
    parseInbox parser inbox = evalStateT (P.parse parser) (P.fromInput inbox)

main :: IO ()
main = do
    (Right a, Right b) <- parseConc textSource (parseChars 'o')  (parseChars 'a')
    putStrLn . show $ (a,b) 

结果是:

("oooo","aa")

我不确定这种方法会带来多少开销。

于 2014-02-11T07:43:31.293 回答
2

由于达沃拉克在他的评论中提到的原因,我认为你处理这件事的方式有问题。但是如果你真的需要这样的功能,你可以定义它。

import Pipes.Internal
import Pipes.Core

zipConsumers :: Monad m => Consumer a m r -> Consumer a m s -> Consumer a m (r,s)
zipConsumers p q = go (p,q) where
  go (p,q) = case (p,q) of 
     (Pure r     , Pure s)      -> Pure (r,s)
     (M mpr      , ps)          -> M (do pr <- mpr
                                         return (go (pr, ps)))
     (pr         , M mps)       -> M (do ps <- mps
                                         return (go (pr, ps)))
     (Request _ f, Request _ g) -> Request () (\a -> go (f a, g a))
     (Request _ f, Pure s)      -> Request () (\a -> do r <- f a
                                                        return (r, s))
     (Pure r     , Request _ g) -> Request () (\a -> do s <- g a
                                                        return (r,s))
     (Respond x _, _          ) -> closed x
     (_          , Respond y _) -> closed y

如果您在不使用返回值的情况下“压缩”消费者,那么您只能使用它们的“效果”tee consumer1 >-> consumer2

于 2014-02-12T07:19:21.057 回答
2

惯用的解决方案是将您的 s 重写Consumer为 aFoldFoldMfoldl库中,然后使用Applicative样式将它们组合起来。然后,您可以将此组合折叠转换为适用于管道的折叠。

假设您有两个Folds:

fold1 :: Fold a r1
fold2 :: Fold a r2

...或两个FoldMs:

foldM1 :: Monad m => FoldM a m r1
foldM2 :: Monad m => FoldM a m r2

然后将它们组合成一个Fold/ FoldMusingApplicative样式:

import Control.Applicative

foldBoth :: Fold a (r1, r2)
foldBoth = (,) <$> fold1 <*> fold2

foldBothM :: Monad m => FoldM a m (r1, r2)
foldBothM = (,) <$> foldM1 <*> foldM2

-- or: foldBoth  = liftA2 (,) fold1  fold2
--     foldMBoth = liftA2 (,) foldM1 foldM2

您可以将 fold 转换为Pipes.Prelude-style fold 或Parser. 以下是必要的转换函数:

import Control.Foldl (purely, impurely)
import qualified Pipes.Prelude as Pipes
import qualified Pipes.Parse   as Parse

purely Pipes.fold
    :: Monad m => Fold a b -> Producer a m () -> m b

impurely Pipes.foldM
    :: Monad m => FoldM m a b -> Producer a m () -> m b

purely Parse.foldAll
    :: Monad m => Fold a b -> Parser a m r

impurely Parse.foldMAll
    :: Monad m => FoldM a m b -> Parser a m r

purely使用andimpurely函数的原因是, foldlandpipes可以互操作,而其中任何一个都不会产生对另一个的依赖。pipes此外,它们还允许(like )以外的库在没有依赖关系的情况下conduit重用foldl(提示提示,@MichaelSnoyman)。

我很抱歉没有记录此功能,主要是因为我花了一段时间才弄清楚如何以无依赖关系的方式获取pipesfoldl互操作,那是在我编写pipes教程之后。我将更新教程以指出这个技巧。

要了解如何使用foldl,只需阅读主模块中的文档即可。这是一个非常小且易于学习的库。

于 2014-02-12T10:17:59.990 回答
1

值得一提的是,在管道世界中,相关功能是zipSinks。可能有一些方法可以使此功能适用于管道,但自动终止可能会妨碍。

于 2014-02-11T06:32:26.740 回答
0

消费者形成一个单子,所以

combineConsumers = liftM2 (,)

将类型检查。不幸的是,语义可能与您期望的不同:第一个消费者将运行到完成,然后是第二个。

于 2014-02-11T01:33:55.267 回答