1

操作员是否在剩余并行任务仍在运行时或仅在它们完成后将其sequential()转换为ParallelFlowable发出项目的流?Flowable

例如:
假设我有 5 个需要并行执行的长时间运行的时变任务。我使用 aParallelFlowable来执行这些任务;之后我添加一个sequential()运算符;然后是take(1)操作员。我会... :

  • 在其余并行任务执行时获取第一项?或
  • 需要等到所有任务完成,然后ParallelFlowable转换为Flowable?

我希望它应该是上述的先验,但想要确认。

4

1 回答 1

2

如果您不确定 RxJava 在某些简单情况下的行为方式,最好的办法是编写一个小程序或单元测试,其中包含几个System.out.println()或与.test()和一些.assertValues()/ .assertNotComplete()/ 等。在不同的地方,看看在何时何地发生了什么。

source
.doOnNext(v -> System.out.println("Source sent: " + v))
// .operator1()
// .operator2()
// etc.

RxJava 是一个组合库,这意味着您通常可以隔离源、处理逻辑和最终消费者,以便可以单独测试它们。例如,您可以将网络调用替换为简单Observable.just()的持续响应。在其他情况下,您可以创建一个PublishSubjectonNext以逐步的方式调用,检查每一个之后的预期状态。

 PublishSubject<Integer> publishSource = PublishSubject.create();

 Observable<Integer> squares = publishSource
 .map(v -> v * v);

但是,为此,您通常需要将处理逻辑分解为所谓的函数转换器方法,这些方法采用类型化但不受约束的源(即)PublishSubject并返回一些其他类型(Observable测试消费者。

 public static Observable<Integer> square(Observable<Integer> source) {
     return source.map(v -> v * v);
 }

 Observable<Integer> squared = square(Observable.just(2));

 Observable<Integer> anotherSqr = square(publishSource);

最后,您可以使用最后一个Observable源或简单地查看可以打印到控制台的内容:

 square(publishSource
     .doOnNext(v -> System.out.println("Input: " + v))
 )
 .subscribe(
     v -> System.out.println("Output: " + v),
     Throwable::printStackTrace, 
     () -> System.out.println("Done")
 );

 publishSource.onNext(1);

 // Input: 1
 // Output: 2

 publishSource.onNext(2);

 // Input: 2
 // Output: 4

 publishSource.onComplete();

 // Done

如果转换涉及异步或定时延迟,您可以插入Thread.sleep()s,以便可以在输出/断言中分离对多个输入的响应。

(至于您想要的确认sequential不等待所有项目完成,只要它们可用并且下游准备好接收它们,它就会从任何平行轨道发出项目。)

于 2018-03-23T13:58:23.800 回答