5

I'm trying to read a group of up to 50 items from a pipe and process them in an IO action all at once. (The use case for this is I'm trying to insert data into a database and I want to do an entire batch inside one transaction because it is vastly more efficient). Here is a simplified version of what I've got so far:

type ExampleType = Int

doSomething :: [ExampleType] -> IO ()
doSomething = undefined

inGroupsOf50 :: Monad m => Producer ExampleType m () -> m ()
inGroupsOf50 input =
    runEffect $ input >-> loop
        where loop = do entries <- replicateM 50 await
                    lift $ doSomething entries  --Insert a bunch all in one transaction
                    loop

The problem is as far as I can tell, unless the number of items to insert happens to divide by 50, I'm going to miss some. What I really want instead of replicateM 50 await is something that gives me up to 50 items or fewer if the input ends but I can't quite figure out how to write that.

I've been thinking that pipes-parse might be the right library to be looking at. draw looks to have a promising signature... but so far all the bits aren't fitting together in my head. I have a producer, I'm writing a consumer and I don't really get how that relates to the concept of a parser.

4

1 回答 1

11

甚至比pipes-parse你想看的还要多pipes-group。特别是,让我们检查一下函数

-- this type is slightly specialized
chunksOf 
  :: Monad m => 
     Int -> 
     Lens' (Producer a m x) (FreeT (Producer a m) m x)

Lens'位可能很吓人,但可以很快消除:它表明我们可以转换Producer a m xFreeT (Producer a m) m x[0]

import Control.Lens (view)

chunkIt :: Monad m => Int -> Producer a m x -> FreeT (Producer a m) m x
chunkIt n = view (chunksOf n)

所以现在我们必须弄清楚如何处理那个FreeT位。特别是,我们将要深入研究free包并提取函数iterT

iterT
  :: (Functor f, Monad m) => 
     (f (m a) -> m a) -> 
     (FreeT f m a -> m a)

这个函数,iterT让我们一次“消耗”FreeT一个“步骤”。为了理解这一点,我们将首先专门化iterT要替换f的类型Producer a m

runChunk :: Monad m =>
            (Producer a m (m x)       -> m x) ->
            (FreeT (Producer a m) m x -> m x)
runChunk = iterT

特别是,只要我们告诉它如何将 a转换为-action ,它就runChunk可以“运行”一个FreeT完整的s。可能开始看起来更熟悉了。当我们定义 的第一个参数时,我们只需要执行一个,在这种情况下,它的元素数量不会超过选定的数量。ProducerProducer a m (m x)mrunChunkProducer

但是有效的返回值是m x怎么回事?它是“延续”,例如在您正在编写的当前块之后出现的所有块。因此,例如,假设我们有 a Producerof Chars 并且我们想在 3 个字符后打印和换行

main :: IO ()
main = flip runChunk (chunkIt 3 input) $ \p -> _

此时的_孔在上下文中的类型为IO ()type 。我们可以使用这个管道,收集它的返回类型(这也是延续),发出一个换行符,然后运行延续。pp :: Producer Char IO (IO ())for

input :: Monad m => Producer Char m ()
input = each "abcdefghijklmnopqrstuvwxyz"

main :: IO ()
main = flip runChunk (chunkIt 3 input) $ \p -> do
  cont <- runEffect $ for p (lift . putChar)
  putChar '\n'
  cont

这完全符合预期

λ> main
abc
def
ghi
jkl
mno
pqr
stu
vwx
yz

需要明确的是,虽然我做了一些说明,但一旦你看到所有部分如何组合在一起,这是相当简单的代码。这是整个清单:

input :: Monad m => Producer Char m ()
input = each "abcdefghijklmnopqrstuvwxyz"

main :: IO ()
main = flip iterT (input ^. chunksOf 3) $ \p -> do
  cont <- runEffect $ for p $ \c -> do
    lift (putChar c)
  putChar '\n'
  cont

[0] 还不止这些,但现在已经足够了。

于 2014-12-29T18:40:24.780 回答