5

我有一个分页资源,我想用 Monix 递归地使用它。我想要一个 Observable,它会发出下载的元素并递归地使用页面。这是一个简单的例子。它当然行不通。它发出第一页,然后是第一页+第二页,然后是第一页+第二页+第三页。我希望它首先发射,然后是第二个,然后是第三个,依此类推。

object Main extends App {

  sealed trait Event
  case class Loaded(xs: Seq[String]) extends Event
  // probably should just finish stream instead of this event
  case object Done extends Event

  // here is the problem
  def consume(page: Int, size: Int):Observable[Event] = {
    Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
      if (xs.isEmpty) Observable.pure(Done)
      else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
    }
  }

  def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
    if (page * size > 100) Seq.empty
    else 0 to size map (x => s"element $x")
  }

  consume(page = 0, size = 5).foreach(println)

}

有任何想法吗?

UPD 对不起,它似乎在工作,我只是有一个错误size + 5。所以看起来问题已经解决了,但是如果你发现我做错了什么,请告诉我。

4

1 回答 1

1

通常建议在使用时尽可能避免递归Observable。因为,它不容易可视化并且通常更容易出错。

一个想法是使用scanEvalF,因为它会在每一步发出项目。

sealed trait Event
object Event {
  case class Loaded(page: Int, size: Int, items: Seq[String]) extends Event
}

def getPaginatedResource(page: Int, size: Int): Task[Loaded] = Task.pure {
  if (page * size > 100) Loaded(page, size, Seq.empty)
  else Loaded(page, size, 0.to(size).map(x => s"element $x"))
}

def consume(page: Int, size: Int): Observable[Event] = {
  Observable
    .interval(0.seconds)
    .scanEvalF(getPaginatedResource(page, size)) { (xs, _) =>
      getPaginatedResource(xs.page + 1, xs.size + 5)
    } // will emit items on each step
    .takeWhileInclusive(_.items.nonEmpty) // only take until list is empty
}

consume(0, 5)
  .foreachL(println)
  .runToFuture
于 2019-11-17T04:59:42.087 回答