背景
我有许多 RxJava Observables(要么从 Jersey 客户端生成,要么使用 stubs 生成Observable.just(someObject)
)。它们都应该只发出一个值。我有一个组件测试,它模拟了所有 Jersey 客户端和使用Observable.just(someObject)
,并且我看到了与运行生产代码时相同的行为。
我有几个类对这些 observables 起作用,执行一些计算(以及一些副作用——我以后可能会让它们直接返回值)并返回空的 void observables。
有一次,在一个这样的课程中,我试图压缩我的几个源 observables 然后映射它们 - 如下所示:
public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}
// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}
然后将计算类全部组合并等待:
// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);
问题
问题是,processToNewObservable()
永远不会被执行。通过消除过程,我可以看到这就是getObservable1()
麻烦所在 - 如果我将其替换为Observable.just(null)
,一切都会按照我的想象执行(但在我想要一个真实的地方使用空值)。
重申一下,在生产代码中从 Jersey 客户端返回一个 Observable,但该客户端是在我的测试中getObservable1()
返回的 Mockito 模拟。Observable.just(someValue)
调查
如果我转换getObservable1()
为阻塞,然后将第一个值包装在 中just()
,一切都会按照我的想象执行(但我不想引入阻塞步骤):
Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,并且zip
看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable。但是,我尝试添加.cache()
到我认为相关的每个可观察源,但这并没有改变行为。
我还尝试在 zip 之前在 getObservable1 上添加 next / error / complete / finally 处理程序(不将其转换为阻塞),但它们都没有执行:
getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);
Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
问题
我对 RxJava 很陌生,所以我很确定我错过了一些基本的东西。问题是:我能做什么愚蠢的事?如果从我到目前为止所说的内容来看这并不明显,我可以做些什么来帮助诊断问题?