9

背景

我有许多 RxJava Observables(要么从 Jersey 客户端生成,要么使用 stubs 生成Observable.just(someObject))。它们都应该只发出一个值。我有一个组件测试,它模拟了所有 Jersey 客户端和使用Observable.just(someObject),并且我看到了与运行生产代码时相同的行为。

我有几个类对这些 observables 起作用,执行一些计算(以及一些副作用——我以后可能会让它们直接返回值)并返回空的 void observables。

有一次,在一个这样的课程中,我试图压缩我的几个源 observables 然后映射它们 - 如下所示:

public Observable<Void> doCalculation() {
    return Observable.zip(
        getObservable1(),
        getObservable2(),
        getObservable3(),
        UnifyingObject::new
    ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
    // ... do some calculation ...
    return Observable.empty();
}

然后将计算类全部组合并等待:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
        .toBlocking().lastOrDefault(null);

问题

问题是,processToNewObservable()永远不会被执行。通过消除过程,我可以看到这就是getObservable1()麻烦所在 - 如果我将其替换为Observable.just(null),一切都会按照我的想象执行(但在我想要一个真实的地方使用空值)。

重申一下,在生产代码中从 Jersey 客户端返回一个 Observable,但该客户端是在我的测试中getObservable1()返回的 Mockito 模拟。Observable.just(someValue)

调查

如果我转换getObservable1()为阻塞,然后将第一个值包装在 中just(),一切都会按照我的想象执行(但我不想引入阻塞步骤):

Observable.zip(
    Observable.just(getObservable1().toBlocking().first()),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

我的第一个想法是,也许其他东西正在消耗我的 observable 发出的值,并且zip看到它已经完成,因此确定压缩它们的结果应该是一个空的 observable。但是,我尝试添加.cache()到我认为相关的每个可观察源,但这并没有改变行为。

我还尝试在 zip 之前在 getObservable1 上添加 next / error / complete / finally 处理程序(不将其转换为阻塞),但它们都没有执行:

getObservable1()
    .doOnNext(...)
    .doOnCompleted(...)
    .doOnError(...)
    .finallyDo(...);

Observable.zip(
    getObservable1(),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

问题

我对 RxJava 很陌生,所以我很确定我错过了一些基本的东西。问题是:我能做什么愚蠢的事?如果从我到目前为止所说的内容来看这并不明显,我可以做些什么来帮助诊断问题?

4

3 回答 3

1

Observable 必须发出来启动链。您必须将您的管道视为 Observable 发出时会发生什么的声明。

您没有分享实际观察到的内容,但 Observable.just() 导致 Observable 立即发出包装的对象。

于 2015-12-13T23:59:53.997 回答
0

根据评论中的响应,其中一个getObservable不返回任何值而只是完成,或者 Mockito 模拟做错了什么。以下独立示例适用于我。你能检查一下并开始慢慢改变它,看看哪里出了问题吗?

Observable.zip(
        Observable.just(1),
        Observable.just(2),
        Observable.just(3),
        (a, b, c) -> new Integer[] { a, b, c })
 .concatMap(a -> Observable.from(a))
 .subscribe(System.out::println)
 ;
于 2015-12-14T15:19:55.687 回答
0

注意:我在这里的回答不是很令人满意,所以我进一步挖掘并发现了一个更小的复制案例,所以我在这里提出了一个新问题:为什么我的 RxJava Observable 只向第一个消费者发出?


我已经解决了至少部分问题(并且,向所有试图回答的人道歉,鉴于我的解释,我认为你没有多少机会)。

执行这些计算的各种类都返回了Observable.empty()(按照processToNewObservable()我原来的例子)。据我所知,Observable.zip()在第 N-1 个可观察对象发出值之前,它不会订阅它正在压缩的第 N 个可观察对象。

我最初的例子声称这getObservable1()是行为不端 - 这实际上有点不准确,它是后来在参数列表中观察到的。据我了解,使其阻塞,然后将该值再次转换为 Observable 的原因是因为使其阻塞并首先调用强制其执行,而我得到了我想要的副作用。

如果我将所有计算类更改为 return Observable.just(null),则一切正常:zip()所有计算类的可观察对象的最终结果都通过它们起作用,因此所有预期的副作用都会发生。

从设计的角度来看,返回一个空 Void 似乎我肯定做错了什么,但至少这个特定问题得到了回答。

于 2015-12-14T20:01:31.777 回答