我认为嵌套迭代是正确的方法,但是这种情况有一些独特的问题,使其与大多数常见示例略有不同。
块和组
第一个问题是正确获取数据源。基本上你所描述的逻辑划分会给你一个相当于[[ByteString]]
. 如果您创建一个枚举器来直接生成它,则流中的每个元素都将是一组完整的块,您可能希望避免(出于内存原因)。您可以将所有内容扁平化为单个[ByteString]
,但随后您需要重新引入边界,这将是非常浪费的,因为数据库正在为您做这件事。
现在忽略组流,看来您需要自己将数据分成块。我将其建模为:
enumGroup :: Enumerator ByteString IO a
enumGroup = enumFromCallback cb ()
where
cb () = do
(code, data) <- getResultData
case code of
OPERATION_SUCCEEDED -> return $ Right ((True, ()), data)
NO_MORE_DATA -> return $ Right ((False, ()), data)
GET_DATA_FAILED -> return $ Left MyException
由于块的大小是固定的,因此您可以轻松地使用Data.Iteratee.group
.
enumGroupChunked :: Iteratee [ByteString] IO a -> IO (Iteratee ByteString IO a)
enumGroupChunked = enumGroup . joinI . group groupSize
比较 this 的类型Enumerator
type Enumerator s m a = Iteratee s m a -> m (Iteratee s m a)
所以enumGroupChunked
基本上是一个花哨的枚举器,它改变了流类型。这意味着它需要一个 [ByteString] 迭代消费者,并返回一个使用普通字节串的迭代消费者。通常枚举器的返回类型并不重要。它只是一个迭代器,您可以使用run
(or tryRun
) 评估以获取输出,因此您可以在此处执行相同操作:
evalGroupChunked :: Iteratee [ByteString] IO a -> IO a
evalGroupChunked i = enumGroupChunked i >>= run
如果您对每个组都有更复杂的处理,那么最简单的地方就是在enumGroupChunked
函数中。
组流
现在这已经不碍事了,如何处理组流?答案取决于你想如何消费它们。如果您想基本上独立处理流中的每个组,我会做类似的事情:
foldStream :: Iteratee [ByteString] IO a -> (b -> a -> b) -> b -> IO b
foldStream iter f acc0 = do
val <- evalGroupChunked iter
res <- getNextItem
case res of
OPERATION_SUCCEEDED -> foldStream iter f $! f acc0 val
NO_MORE_DATA -> return $ f acc0 val
GET_DATA_FAILED -> error "had a problem"
但是,假设您想要对整个数据集进行某种流处理,而不仅仅是单个组。也就是说,你有一个
bigProc :: Iteratee [ByteString] IO a
你想在整个数据集上运行。这就是枚举器的返回迭代器有用的地方。一些早期的代码现在会略有不同:
enumGroupChunked' :: Iteratee [ByteString] IO a
-> IO (Iteratee ByteString IO (Iteratee [ByteString] IO a))
enumGroupChunked' = enumGroup . group groupSize
procStream :: Iteratee [ByteString] IO a -> a
procStream iter = do
i' <- enumGroupChunked' iter >>= run
res <- getNextItem
case res of
OPERATION_SUCCEEDED -> procStream i'
NO_MORE_DATA -> run i'
GET_DATA_FAILED -> error "had a problem"
嵌套迭代器(即Iteratee s1 m (Iteratee s2 m a)
)的这种用法有点不常见,但当您想要顺序处理来自多个枚举器的数据时,它特别有用。关键是要认识到run
ing 外部 iteratee 会给你一个准备好接收更多数据的 iteratee。这是一个在这种情况下运行良好的模型,因为您可以独立枚举每个组,但将它们作为单个流处理。
一个警告:内部迭代将处于任何状态。假设一个组的最后一个块可能小于一个完整的块,例如
Group A Group B Group C
1024, 1024, 512 1024, 1024, 1024 1024, 1024, 1024
在这种情况下会发生什么,因为group
将数据组合成大小为 1024 的块,它会将 A 组的最后一个块与 B 组的前 512 个字节组合起来。这不是foldStream
示例的问题,因为该代码终止内部迭代(带joinI
)。这意味着这些群体是真正独立的,所以你必须这样对待他们。如果要像 中那样组合组procStream
,则必须考虑整个流。如果这是您的情况,那么您将需要使用比group
.
Data.Iteratee 与 Data.Enumerator
在不讨论任何一个包的优点的情况下,更不用说IterIO(我承认我有偏见),我想指出我认为两者之间最显着的区别:流的抽象。
在 Data.Iteratee 中,消费者Iteratee ByteString m a
对某个长度的名义 ByteString 进行操作,一次可以访问单个块ByteString
。
在 Data.Enumerator 中,消费者Iteratee ByteString m a
对一个名义上的 [ByteString] 进行操作,一次可以访问一个或多个元素(字节串)。
这意味着大多数 Data.Iteratee 操作都是以元素为中心的,也就是说,Iteratee ByteString
它们将在单个 上Word8
操作,而 Data.Enumerator 操作以块为中心,在 a 上操作ByteString
。
你可以想到Data.Iteratee.Iteratee [s] m a
=== Data.Enumerator.Iteratee s m a
。