我想Observable
从列表的结果中实时生成Futures
.
在最简单的情况下,假设我有一个我正在运行的期货列表Future.sequence
,并且我只是在监视它们的进度,并在Observable
每次完成时告诉我。我基本上是这样做的:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
这在我的测试环境中运行良好。但我刚刚读到onNext
不应该从不同的线程调用,至少没有注意没有重叠的调用。解决此问题的推荐方法是什么?似乎许多现实世界的使用Observables
都需要onNext
从这样的异步代码中调用,但我在文档中找不到类似的示例。