1

当一些订阅者订阅了一个 flowable 并且这个 flowable 向订阅者发出项目时,每个订阅者可以得到不同的值吗?

例如,如果一个 flowable 发射

0, 1, 2, 3...

一个观察者得到 0、2,另一个观察者得到 1、3,依此类推,就像负载均衡器一样。

4

2 回答 2

0

这个解决方案怎么样?

  @Test
  public void testFlowableLoadBalancer() {
    IntStream stream = IntStream.iterate(1, i -> i + 1);
    Flowable<Integer> flowable = Flowable.create(e -> stream.forEach(i -> {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ignored) {
      }
      e.onNext(i);
    }), BackpressureStrategy.DROP);


    ConnectableFlowable<Integer> cf = flowable.publish();

    cf.filter(i -> i % 2 == 0).subscribe(i -> {
      logger.info("[even] i = {}" , i);
    });

    cf.filter(i -> i % 2 == 1).subscribe(i -> {
      logger.info("[ odd] i = {}" , i);
    });

    cf.connect();
  }

输出 :

2016-11-11 18:15:57,884 INFO  data.Rx2Test - [ odd] i = 1
2016-11-11 18:15:58,892 INFO  data.Rx2Test - [even] i = 2
2016-11-11 18:15:59,895 INFO  data.Rx2Test - [ odd] i = 3
2016-11-11 18:16:00,900 INFO  data.Rx2Test - [even] i = 4

问题未解决:这不能自动平衡,等待其他答案。

于 2016-11-11T10:18:31.727 回答
0

ParallelFlowable 更接近我想做的事情。它从 2.0.5 开始可用。

于 2017-03-02T06:44:48.583 回答