5

我想Observable从列表的结果中实时生成Futures.

在最简单的情况下,假设我有一个我正在运行的期货列表Future.sequence,并且我只是在监视它们的进度,并在Observable每次完成时告诉我。我基本上是这样做的:

  def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
    Observable[String](observer => {
      val loudFutures: List[Future[Int]] = futs.map(f => {
            f onComplete {
              case Success(a) => observer.onNext(s"just did $a more")
              case Failure(e) => observer.onError(e)
            }
            f
        })
      Future.sequence(loudFutures) onComplete {
          case Success(_) => observer.onCompleted()
          case Failure(e) => observer.onError(e)
      }
    })
  }

这在我的测试环境中运行良好。但我刚刚读到onNext不应该从不同的线程调用,至少没有注意没有重叠的调用。解决此问题的推荐方法是什么?似乎许多现实世界的使用Observables都需要onNext从这样的异步代码中调用,但我在文档中找不到类似的示例。

4

1 回答 1

1

可观察合约

Observables 必须串行(而不是并行)向观察者发出通知。他们可能会从不同的线程发出这些通知,但通知之间必须有正式的先发生关系。

看看序列化

Observable 可以异步调用其观察者的方法,可能来自不同的线程。这可能会使这样的 Observable 违反 Observable 合同,因为它可能会尝试在其 OnNext 通知之一之前发送 OnCompleted 或 OnError 通知,或者它可能同时从两个不同的线程发出 OnNext 通知。你可以通过对它应用 Serialize 操作符来强制这样一个 Observable 表现良好和同步。

于 2016-03-13T02:28:38.550 回答