2

我正在尝试做的是使用 takeWhile 将字节串拆分为某个字符。

import qualified Data.ByteString.Internal as BS (c2w, w2c)
import Pipes
import Pipes.ByteString as PB
import Pipes.GZip
import Pipes.Prelude as PP
import System.IO

newline = BS.c2w '\n'

splitter = PB.takeWhile (\myWord -> myWord /= newline)

myPipe fileHandle = PP.toListM $ decompress fileProducer >-> splitter
  where
    fileProducer = PB.fromHandle fileHandle       

run = do
  dat <- withFile "somefile.blob" ReadMode myPipe
  pure dat

这让我获得了第一行,但我真正想要的是一次有效地将每个块产生一个换行符。我怎么做?

4

2 回答 2

3

@Michael's answer is good. I just want to illustrate some usage patterns that are going on here.

( .lhs available at http://lpaste.net/165352 )

First a few imports:

 {-# LANGUAGE OverloadedStrings, NoMonomorphismRestriction #-}

 import Pipes
 import qualified Pipes.Prelude as PP
 import qualified Pipes.Group as PG
 import qualified Pipes.ByteString as PB
 import qualified Pipes.GZip as GZip
 import qualified Data.ByteString as BS
 import Lens.Family (view, over)
 import Control.Monad
 import System.IO

If you look over the functions in Pipes.ByteString and Pipes.GZip you'll see that they all into the following type schemas:

  1. Producer ... -> FreeT (Producer ...) ...
  2. FreeT (Producer ...) ... -> Producer ...
  3. Lens' (Producer ...) (FreeT (Producer ...) ...)
  4. Producer ... -> Producer ...

Examples of functions in each category:

  1. PB.words
  2. PG.concats
  3. PB.lines, PB.chunksOf, PB.splits, ...
  4. GZip.compress, GZip.decompress

Here's how to use PB.words to split an input stream into words:

 prod = yield "this is\na test\nof the pipes\nprocessing\nsystem"

 t1 = runEffect $ (PG.concats . PB.words) prod >-> PP.print

To use a function of type 3 -- e.g. PB.lines, just use view on the Lens' to get a function of type 1 and then compose with PG.concats:

 t2a = runEffect $ (PG.concats . view PB.lines) prod >-> PP.print

 t2b h = (PG.concats . view PB.lines) (PB.fromHandle h) >-> PP.print

 run2 = withFile "input" ReadMode (runEffect . t2b)

For a Producer -> Producer function, just use normal function application:

 t3 h = GZip.decompress (PB.fromHandle h) >-> PP.print

 run3 = withFile "input.gz" ReadMode (runEffect . t3)

 t4 h = GZip.decompress (PB.fromHandle h) >-> PP.map BS.length >-> PP.print

 run4 = withFile "big.gz" ReadMode (runEffect . t4)

To first decompress and then split by lines, we nest function application:

 t5 h = (PG.concats . view PB.lines) ( GZip.decompress (PB.fromHandle h) )
          >-> PP.map BS.length >-> PP.print

 run5 = withFile "input.gz" ReadMode (runEffect . t5)
于 2016-06-04T20:27:11.433 回答
2

pipes-bytestring并且pipes-group被安排成重复打破 aProducer ByteString m r产生 a FreeT (Producer ByteString m) m rFreeT在这里可以理解为A_Succession_Of,因此结果可以被认为是“一系列返回 r 的字节串生产者段”。这样,如果其中一个段是 10 GB 长,我们仍然有流而不是 10 GB 严格的 bytestring

在我看来,您想在换行符上打破字节串生产者,但我不知道您是否想保留换行符。如果您将它们扔掉,这与将字节串生产者拆分为相同view PB.lines,然后将每个从属生产者连接成一个严格的字节串 - 单独的行。我在下面写了这个accumLines。它很简单,但只需少量使用Lens.view即可将花哨的PB.lines镜头变成常规功能。(许多操作被写成镜头,pipes-bytestring因为这样它们可以被重新用于其他目的,尤其是那种生产者解析的pipes好处。)

import Pipes
import qualified Pipes.Prelude as P
import Pipes.ByteString as PB
import qualified Pipes.Group as PG
import Pipes.GZip

import qualified Data.ByteString.Internal as BS (c2w, w2c)

import System.IO
import Lens.Simple (view) -- or Control.Lens or whatever
import Data.Monoid

main = run >>= mapM_ print

myPipe fileHandle = P.toListM $ accumLines (decompress fileProducer)
  where
    fileProducer = PB.fromHandle fileHandle

run = do
  dat <- withFile "a.gz" ReadMode myPipe
  pure dat

-- little library additions

accumLines :: Monad m => Producer ByteString m r -> Producer ByteString m r
accumLines = mconcats . view PB.lines 

accumSplits :: Monad m => Char -> Producer ByteString m r -> Producer ByteString m r
accumSplits c  = mconcats . view (PB.splits (BS.c2w c)) 

-- this is convenient, but the operations above could 
-- be more rationally implemented using e.g. BL.fromChunks and toListM 
mconcats :: (Monad m, Monoid b) => FreeT (Producer b m) m r -> Producer b m r
mconcats = PG.folds (<>) mempty id

理想情况下,您不会在每个换行符处编写新的字节串。是否必须取决于您要对这些线条做什么。

于 2016-06-04T18:50:43.890 回答