如果您不确定 RxJava 在某些简单情况下的行为方式,最好的办法是编写一个小程序或单元测试,其中包含几个System.out.println()
或与.test()
和一些.assertValues()
/ .assertNotComplete()
/ 等。在不同的地方,看看在何时何地发生了什么。
source
.doOnNext(v -> System.out.println("Source sent: " + v))
// .operator1()
// .operator2()
// etc.
RxJava 是一个组合库,这意味着您通常可以隔离源、处理逻辑和最终消费者,以便可以单独测试它们。例如,您可以将网络调用替换为简单Observable.just()
的持续响应。在其他情况下,您可以创建一个PublishSubject
并onNext
以逐步的方式调用,检查每一个之后的预期状态。
PublishSubject<Integer> publishSource = PublishSubject.create();
Observable<Integer> squares = publishSource
.map(v -> v * v);
但是,为此,您通常需要将处理逻辑分解为所谓的函数转换器方法,这些方法采用类型化但不受约束的源(即)PublishSubject
并返回一些其他类型(Observable
测试消费者。
public static Observable<Integer> square(Observable<Integer> source) {
return source.map(v -> v * v);
}
Observable<Integer> squared = square(Observable.just(2));
Observable<Integer> anotherSqr = square(publishSource);
最后,您可以使用最后一个Observable
源或简单地查看可以打印到控制台的内容:
square(publishSource
.doOnNext(v -> System.out.println("Input: " + v))
)
.subscribe(
v -> System.out.println("Output: " + v),
Throwable::printStackTrace,
() -> System.out.println("Done")
);
publishSource.onNext(1);
// Input: 1
// Output: 2
publishSource.onNext(2);
// Input: 2
// Output: 4
publishSource.onComplete();
// Done
如果转换涉及异步或定时延迟,您可以插入Thread.sleep()
s,以便可以在输出/断言中分离对多个输入的响应。
(至于您想要的确认,sequential
不等待所有项目完成,只要它们可用并且下游准备好接收它们,它就会从任何平行轨道发出项目。)