1

我尝试在下面测试sereialize().

我调用onNext了 1,000,000 次以从 2 个不同的线程中计数。然后,我预计在onComplete.

但是,我无法获得预期值。

private static int count = 0;

private static void setCount(int value) {
  count = value;
}

private static final int TEST_LOOP = 10;

private static final int NEXT_LOOP = 1_000_000;

@Test
public void test() throws Exception {

  for (int test = 0; test < TEST_LOOP; test++) {
    Flowable.create(emitter -> {
      ExecutorService service = Executors.newCachedThreadPool();
      emitter.setCancellable(() -> service.shutdown());

      Future<Boolean> future1 = service.submit(() -> {
        for (int i = 0; i < NEXT_LOOP; i++) {
          emitter.onNext(i);
        }
        return true;
      });

      Future<Boolean> future2 = service.submit(() -> {
        for (int i = 0; i < NEXT_LOOP; i++) {
          emitter.onNext(i);
        }
        return true;
      });

      if (future1.get(1, TimeUnit.SECONDS)
          && future2.get(1, TimeUnit.SECONDS)) {
        emitter.onComplete();
      }
    }, BackpressureStrategy.BUFFER)
        .serialize()
        .cast(Integer.class)
        .subscribe(new Subscriber<Integer>() {

          private int count = 0;

          @Override
          public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
          }

          @Override
          public void onNext(Integer t) {
            count++;
          }

          @Override
          public void onError(Throwable t) {
            fail(t.getMessage());
          }

          @Override
          public void onComplete() {
            setCount(count);
          }
        });

    assertThat(count, is(NEXT_LOOP * 2));
  }
}

我想知道是否serialize()不起作用或我误解了的用法serialize()

我检查了 SerializedSubscriber 的来源。

@Override
public void onNext(T t) {
  ...
  synchronized(this){
    ...
  }
  actual.onNext(t);
  emitLoop();
}

由于actual.onNext(t);是从同步块中调用的,我想actual.onNext(t);可以同时从不同的线程调用。另外,我猜可能会在完成onComplete之前打电话。onNext

我使用了 RxJava 2.0.4。

4

1 回答 1

1

这不是错误,而是滥用FlowableEmitter

onNext、onError 和 onComplete 方法应该按顺序调用,就像订阅者的方法一样。如果要确保这一点,请使用serialize() 。其他方法是线程安全的。

FlowableEmitter.serialize()

对于运营商来说,申请Flowable.serialize()为时已晚。create

于 2017-02-01T20:02:03.333 回答