当一些订阅者订阅了一个 flowable 并且这个 flowable 向订阅者发出项目时,每个订阅者可以得到不同的值吗?
例如,如果一个 flowable 发射
0, 1, 2, 3...
一个观察者得到 0、2,另一个观察者得到 1、3,依此类推,就像负载均衡器一样。
这个解决方案怎么样?
@Test
public void testFlowableLoadBalancer() {
IntStream stream = IntStream.iterate(1, i -> i + 1);
Flowable<Integer> flowable = Flowable.create(e -> stream.forEach(i -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
e.onNext(i);
}), BackpressureStrategy.DROP);
ConnectableFlowable<Integer> cf = flowable.publish();
cf.filter(i -> i % 2 == 0).subscribe(i -> {
logger.info("[even] i = {}" , i);
});
cf.filter(i -> i % 2 == 1).subscribe(i -> {
logger.info("[ odd] i = {}" , i);
});
cf.connect();
}
输出 :
2016-11-11 18:15:57,884 INFO data.Rx2Test - [ odd] i = 1
2016-11-11 18:15:58,892 INFO data.Rx2Test - [even] i = 2
2016-11-11 18:15:59,895 INFO data.Rx2Test - [ odd] i = 3
2016-11-11 18:16:00,900 INFO data.Rx2Test - [even] i = 4
问题未解决:这不能自动平衡,等待其他答案。
ParallelFlowable 更接近我想做的事情。它从 2.0.5 开始可用。