我有一个分页资源,我想用 Monix 递归地使用它。我想要一个 Observable,它会发出下载的元素并递归地使用页面。这是一个简单的例子。它当然行不通。它发出第一页,然后是第一页+第二页,然后是第一页+第二页+第三页。我希望它首先发射,然后是第二个,然后是第三个,依此类推。
object Main extends App {
sealed trait Event
case class Loaded(xs: Seq[String]) extends Event
// probably should just finish stream instead of this event
case object Done extends Event
// here is the problem
def consume(page: Int, size: Int):Observable[Event] = {
Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
if (xs.isEmpty) Observable.pure(Done)
else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
}
}
def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
if (page * size > 100) Seq.empty
else 0 to size map (x => s"element $x")
}
consume(page = 0, size = 5).foreach(println)
}
有任何想法吗?
UPD
对不起,它似乎在工作,我只是有一个错误size + 5
。所以看起来问题已经解决了,但是如果你发现我做错了什么,请告诉我。