我已经按照 RxJava2 wiki ( https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-liftFlowableOperator
)中的描述实现了一个,除了我在操作中执行了一些测试像那样:onNext()
public final class MyOperator implements FlowableOperator<Integer, Integer> {
...
static final class Op implements FlowableSubscriber<Integer>, Subscription {
@Override
public void onNext(Integer v) {
if (v % 2 == 0) {
child.onNext(v * v);
}
}
...
}
}
该运算符是我Flowable
创建的带有背压降的链条的一部分。本质上,它看起来几乎是这样的:
Flowable.<Integer>create(emitter -> myAction(), DROP)
.filter(v -> v > 2)
.lift(new MyOperator())
.subscribe(n -> doSomething(n));
我遇到了以下问题:
- 出现背压,因此
doSomething(n)
无法处理即将到来的上游 - 由于选择了背压策略,项目被丢弃
- 但是 doSomething(n) 在执行 drop 后永远不会收到新项目,而 doSomething(n) 已准备好处理新项目
回顾 David Karnok的优秀博文http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html,看来我需要request(1)
在onNext()
方法中添加一个。但那是用 RxJava1 ......
所以,我的问题是:这个修复在 RxJava2 中是否足以解决我的背压问题?或者我的操作员是否必须实现所有关于原子的东西,耗尽https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-serialization-deferred-actions中描述的东西来正确处理我的背压问题?
注意:我添加了request(1)
它,它似乎工作。但我不知道这是否足够,或者我的操作员是否需要队列排水和原子等棘手的东西。
提前致谢!