4

我正在使用一个特定的数据库,在成功查询后,您可以使用特定命令访问一组结果数据块:

getResultData :: IO (ResponseCode, ByteString)

现在 getResultData 将返回一个响应代码和一些数据,其中响应代码如下所示:

response = GET_DATA_FAILED | OPERATION_SUCCEEDED | NO_MORE_DATA

ByteString 是一个、一些或所有块:

数据 http://desmond.imageshack.us/Himg189/scaled.php?server=189&filename=chunksjpeg.png&res=medium

故事并没有到此结束。存在一个组流:

串流 http://desmond.imageshack.us/Himg695/scaled.php?server=695&filename=chunkgroupsjpeg.png&res=medium

一旦收到来自 getResultData 的 NO_MORE_DATA 响应,对 getNextItem 的调用将迭代流,允许我再次开始调用 getResultData。一旦 getNextItem 返回 STREAM_FINISHED,这就是她写的全部内容;我有我的数据。

现在,我希望使用 Date.Iteratee 或 Data.Enumerator 来重塑这种现象。就我现有的 Data.Iteratee 解决方案而言,它似乎非常幼稚,我觉得我应该使用嵌套的 iteratee 进行建模,而不是一个大的 iteratee blob,这是我的解决方案目前的实现方式。

我一直在看 Data.Iteratee 0.8.6.2 的代码,当谈到嵌套的东西时我有点困惑。

嵌套迭代是正确的行动方案吗?如果是这样,如何用嵌套的迭代来建模呢?

问候

4

1 回答 1

3

我认为嵌套迭代是正确的方法,但是这种情况有一些独特的问题,使其与大多数常见示例略有不同。

块和组

第一个问题是正确获取数据源。基本上你所描述的逻辑划分会给你一个相当于[[ByteString]]. 如果您创建一个枚举器来直接生成它,则流中的每个元素都将是一组完整的块,您可能希望避免(出于内存原因)。您可以将所有内容扁平化为单个[ByteString],但随后您需要重新引入边界,这将是非常浪费的,因为数据库正在为您做这件事。

现在忽略组流,看来您需要自己将数据分成块。我将其建模为:

enumGroup :: Enumerator ByteString IO a
enumGroup = enumFromCallback cb ()
 where
  cb () = do
    (code, data) <- getResultData
    case code of
        OPERATION_SUCCEEDED -> return $ Right ((True, ()), data)
        NO_MORE_DATA        -> return $ Right ((False, ()), data)
        GET_DATA_FAILED     -> return $ Left MyException

由于块的大小是固定的,因此您可以轻松地使用Data.Iteratee.group.

enumGroupChunked :: Iteratee [ByteString] IO a -> IO (Iteratee ByteString IO a)
enumGroupChunked = enumGroup . joinI . group groupSize

比较 this 的类型Enumerator

type Enumerator s m a = Iteratee s m a -> m (Iteratee s m a)

所以enumGroupChunked基本上是一个花哨的枚举器,它改变了流类型。这意味着它需要一个 [ByteString] 迭代消费者,并返回一个使用普通字节串的迭代消费者。通常枚举器的返回类型并不重要。它只是一个迭代器,您可以使用run(or tryRun) 评估以获取输出,因此您可以在此处执行相同操作:

evalGroupChunked :: Iteratee [ByteString] IO a -> IO a
evalGroupChunked i = enumGroupChunked i >>= run

如果您对每个组都有更复杂的处理,那么最简单的地方就是在enumGroupChunked函数中。

组流

现在这已经不碍事了,如何处理组流?答案取决于你想如何消费它们。如果您想基本上独立处理流中的每个组,我会做类似的事情:

foldStream :: Iteratee [ByteString] IO a -> (b -> a -> b) -> b -> IO b
foldStream iter f acc0 = do
  val <- evalGroupChunked iter
  res <- getNextItem
  case res of 
        OPERATION_SUCCEEDED -> foldStream iter f $! f acc0 val
        NO_MORE_DATA        -> return $ f acc0 val
        GET_DATA_FAILED     -> error "had a problem"

但是,假设您想要对整个数据集进行某种流处理,而不仅仅是单个组。也就是说,你有一个

bigProc :: Iteratee [ByteString] IO a

你想在整个数据集上运行。这就是枚举器的返回迭代器有用的地方。一些早期的代码现在会略有不同:

enumGroupChunked' :: Iteratee [ByteString] IO a
  -> IO (Iteratee ByteString IO (Iteratee [ByteString] IO a))
enumGroupChunked' = enumGroup . group groupSize

procStream :: Iteratee [ByteString] IO a -> a
procStream iter = do
  i' <- enumGroupChunked' iter >>= run
  res <- getNextItem
  case res of 
        OPERATION_SUCCEEDED -> procStream i'
        NO_MORE_DATA        -> run i'
        GET_DATA_FAILED     -> error "had a problem"

嵌套迭代器(即Iteratee s1 m (Iteratee s2 m a))的这种用法有点不常见,但当您想要顺序处理来自多个枚举器的数据时,它特别有用。关键是要认识到runing 外部 iteratee 会给你一个准备好接收更多数据的 iteratee。这是一个在这种情况下运行良好的模型,因为您可以独立枚举每个组,但将它们作为单个流处理。

一个警告:内部迭代将处于任何状态。假设一个组的最后一个块可能小于一个完整的块,例如

   Group A               Group B               Group C
   1024, 1024, 512       1024, 1024, 1024      1024, 1024, 1024

在这种情况下会发生什么,因为group将数据组合成大小为 1024 的块,它会将 A 组的最后一个块与 B 组的前 512 个字节组合起来。这不是foldStream示例的问题,因为该代码终止内部迭代(带joinI)。这意味着这些群体是真正独立的,所以你必须这样对待他们。如果要像 中那样组合组procStream,则必须考虑整个流。如果这是您的情况,那么您将需要使用比group.

Data.Iteratee 与 Data.Enumerator

在不讨论任何一个包的优点的情况下,更不用说IterIO(我承认我有偏见),我想指出我认为两者之间最显着的区别:流的抽象。

在 Data.Iteratee 中,消费者Iteratee ByteString m a对某个长度的名义 ByteString 进行操作,一次可以访问单个块ByteString

在 Data.Enumerator 中,消费者Iteratee ByteString m a对一个名义上的 [ByteString] 进行操作,一次可以访问一个或多个元素(字节串)。

这意味着大多数 Data.Iteratee 操作都是以元素为中心的,也就是说,Iteratee ByteString它们将在单个 上Word8操作,而 Data.Enumerator 操作以块为中心,在 a 上操作ByteString

你可以想到Data.Iteratee.Iteratee [s] m a=== Data.Enumerator.Iteratee s m a

于 2011-09-27T01:36:59.283 回答