我通过调用一次一页(每页的记录数未知)从数据库API顺序读取大量记录def readPage(pageNumber: Int): Iterator[Record]
我试图以一种功能性的方式将这个 API 包装成类似Stream[Iterator[Record]]
或者Iterator[Iterator[Record]]
懒惰的方式,理想情况下没有可变状态,具有恒定的内存占用,这样我就可以将它视为无限的页面流或迭代器序列,并抽象掉来自客户端的分页。客户端可以迭代结果,通过调用 next() 它将检索下一页 (Iterator[Record])。
在 Scala 中实现这一点的最惯用和最有效的方法是什么。
编辑:需要一次一页地获取和处理记录,无法维护内存中所有页面的所有记录。如果一页失败,则抛出异常。大量页面/记录对于所有实际目的来说意味着无限。我想将它视为页面的无限流(或迭代器),每个页面都是有限数量记录的迭代器(例如,小于<1000,但如果时间提前,确切数量未知)。
我在Monix中查看了 BatchCursor,但它有不同的用途。
编辑 2:这是使用以下 Tomer 的答案作为起点的当前版本,但使用 Stream 而不是 Iterator。这允许根据https://stackoverflow.com/a/10525539/165130消除尾递归的需要,并且有 O(1) 时间进行流前置#::
操作(如果我们通过++
操作连接迭代器,它将是 O (n))
注意:虽然流被延迟评估,但流记忆仍可能导致内存爆炸,并且内存管理变得棘手。val
从to更改为def
在下面定义 Streamdef pages = readAllPages
似乎没有任何效果
def readAllPages(pageNumber: Int = 0): Stream[Iterator[Record]] = {
val iter: Iterator[Record] = readPage(pageNumber)
if (iter.isEmpty)
Stream.empty
else
iter #:: readAllPages(pageNumber + 1)
}
//usage
val pages = readAllPages
for{
page<-pages
record<-page
if(isValid(record))
}
process(record)
编辑 3:Tomer 的第二个建议似乎是最好的,它的运行时和内存占用与上述解决方案类似,但更简洁且容易出错。
val pages = Stream.from(1).map(readPage).takeWhile(_.nonEmpty)
注意: Stream.from(1)
创建一个从 1 开始并以 1 递增的流,它在API 文档中