编辑:
据我所知,您的解决方案是正确的。如果您想稍微提高性能,请考虑:
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
这有点快,因为它不会创建异步计算来映射Future.apply
未来。
老的:
我将在下面留下我的旧建议,该建议仅在您将单个事件映射到Observable
.
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
解释
由于您是从一个Observable[In]
对象(即事件流)开始的,因此您需要找到一种将事件从该对象转移Observable
到未来的方法。创建新未来的典型方法是创建它的Promise
第一个 -未来对象的输入端。然后,当第一个事件到达时,您可以使用foreach
onObservable
来调用未来:trySuccess
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
一旦事件Observable
到达,promise 将异步完成。future
我们现在可以通过调用它的方法来获取promise的读取端,即future ;然后呼唤map
未来—— promiseIn.future.map(futureInToFutureOut)
。图形化:
promiseIn.future ---x---> map ---f(x)---> futureOut
由此产生的未来是异步完成的futureInToFutureOut(x)
。在这一点上,我们需要找到一种方法将这个值通过Observable[Out]
. 创建新的典型方法Observable
是调用Observable.create
工厂方法。这个方法作为Observable
' 的写作结束 - Observer
,我们使用它来通过调用来发出事件onNext
:
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
由于我们知道未来最多发出一个事件,因此我们调用onCompleted
观察者的 on 来关闭输出Observable
。
编辑:如果您想掌握 Rx 和 Scala 期货,您可能需要考虑这本书,它涉及这些主题。免责声明:我是作者。