2

假设您要将来自某个流的事件存储到数据库中,并且该数据库的客户端库是异步的。

例如,当发出下一个事件时writeEvent(event: MyEvent): Future[Boolean],您必须在观察者内部调用一个方法。onNext除了阻塞之外,还有什么好方法可以做到这一点Future

我目前看到的关于如何实现这一点的唯一方法是创建一些自定义Scheduler,允许我将线程返回到池中,直到内部onNext的异步代码完成。

4

1 回答 1

2

您不想在onNext订阅者回调中进行这样的阻塞,这会破坏 Rx。它可以以更惯用的方式链接在一起。

我与 Futures 合作不多,但我想知道是否Observable.from(Future)可行:

someStream
    .flatMap(evt => Observable.from(writeEvent(evt)))
    .subscribe(
        next => ...,
        err => ...
)
于 2016-10-23T06:45:12.780 回答