3

假设我有一个类型为元素的事件流In

val observableIn: Observable[In] = ???

还有一个用于将 type 对象转换为 typeIn对象的函数Out,但“在未来”:

val futureInToFutureOut: (Future[In]) => Future[Out] = ???

此时我想observableIn根据我的功能转换我的元素futureInToFutureOut。也就是说,我想要一个类型为元素的事件流Out,与原始流的元素匹配,但通过 function 转换futureInToFutureOut

我认为这应该有效:

val observableOut: Observable[Out] = observableIn flatMap { in =>
  Observable.from(futureInToFutureOut(Future(in)))
}

这是正确的吗?有一个更好的方法吗?

4

1 回答 1

2

编辑:

据我所知,您的解决方案是正确的。如果您想稍微提高性能,请考虑:

val observableOut: Observable[Out] = observableIn.flatMap { in =>
  val p = Promise.successful(in)
  Observable.from(futureInToFutureOut(p.future))
}

这有点快,因为它不会创建异步计算来映射Future.apply未来

老的:

我将在下面留下我的旧建议,该建议仅在您将单个事件映射到Observable.

import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
  promiseIn.future.map(futureInToFutureOut).foreach { y =>
    observer.onNext(y)
    observer.onCompleted()
  }
}

解释

由于您是从一个Observable[In]对象(即事件流)开始的,因此您需要找到一种将事件从该对象转移Observable到未来的方法。创建新未来的典型方法是创建它的Promise第一个 -未来对象的输入端。然后,当第一个事件到达时,您可以使用foreachonObservable来调用未来:trySuccess

observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)

一旦事件Observable到达,promise 将异步完成。future我们现在可以通过调用它的方法来获取promise的读取端,即future ;然后呼唤map未来—— promiseIn.future.map(futureInToFutureOut)。图形化:

promiseIn.future ---x---> map ---f(x)---> futureOut

由此产生的未来是异步完成的futureInToFutureOut(x)。在这一点上,我们需要找到一种方法将这个值通过Observable[Out]. 创建新的典型方法Observable是调用Observable.create工厂方法。这个方法作为Observable' 的写作结束 - Observer,我们使用它来通过调用来发出事件onNext

futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))

由于我们知道未来最多发出一个事件,因此我们调用onCompleted观察者的 on 来关闭输出Observable

编辑:如果您想掌握 Rx 和 Scala 期货,您可能需要考虑这本书,它涉及这些主题。免责声明:我是作者。

于 2015-07-13T18:14:59.410 回答