0

我已经按照 RxJava2 wiki ( https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#operator-targeting-liftFlowableOperator )中的描述实现了一个,除了我在操作中执行了一些测试像那样:onNext()

public final class MyOperator implements FlowableOperator<Integer, Integer> {

...

static final class Op implements FlowableSubscriber<Integer>, Subscription {

    @Override
    public void onNext(Integer v) {
        if (v % 2 == 0) {
          child.onNext(v * v);
        }  
    }
   ...
  }
}

该运算符是我Flowable创建的带有背压降的链条的一部分。本质上,它看起来几乎是这样的:

Flowable.<Integer>create(emitter -> myAction(), DROP)
        .filter(v -> v > 2)
        .lift(new MyOperator())
        .subscribe(n -> doSomething(n));

我遇到了以下问题:

  • 出现背压,因此doSomething(n)无法处理即将到来的上游
  • 由于选择了背压策略,项目被丢弃
  • 但是 doSomething(n) 在执行 drop 后永远不会收到新项目,而 doSomething(n) 已准备好处理新项目

回顾 David Karnok的优秀博文http://akarnokd.blogspot.fr/2015/05/pitfalls-of-operator-implementations.html,看来我需要request(1)onNext()方法中添加一个。但那是用 RxJava1 ......

所以,我的问题是:这个修复在 RxJava2 中是否足以解决我的背压问题?或者我的操作员是否必须实现所有关于原子的东西,耗尽https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#atomics-serialization-deferred-actions中描述的东西来正确处理我的背压问题?

注意:我添加了request(1)它,它似乎工作。但我不知道这是否足够,或者我的操作员是否需要队列排水和原子等棘手的东西。

提前致谢!

4

2 回答 2

1

FlowableOperator 是否天生就支持背压?

FlowableOperator是为给定下游调用的接口,Subscriber应返回一个新的接口,该接口Subscriber包装下游并调制在一个或两个方向上传递的 Reactive Streams 事件。背压支持是Subscriber实现的责任,而不是这个特定的功能接口。它本来可以,Function<Subscriber, Subscriber>但一个单独的命名接口被认为更有用,更不容易发生过载冲突。

需要在 onNext() [...] 中添加一个 request(1) 但我不知道这是否足够,或者我的操作员是否需要队列排水和原子的棘手内容。

是的,你也必须在 RxJava 2 中这样做。由于 RxJava 2Subscriber不是一个类,它没有 v1 的便捷request方法。您必须保存SubscriptioninonSubscribe并调用inupstream.request(1)中的适当路径onNext。对于您的情况,这应该足够了。

我已经用一个新的部分更新了 wiki,明确解释了这个案例:

https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#replenishing

final class FilterOddSubscriber implements FlowableSubscriber<Integer>, Subscription {

    final Subscriber<? super Integer> downstream;

    Subscription upstream;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
        if (upstream != null) {
            s.cancel();
        } else {
            upstream = s;                    // <-------------------------
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(Integer item) {
        if (item % 2 != 0) {
           downstream.onNext(item);
        } else {
           upstream.request(1);              // <-------------------------
        }
    }

    @Override
    public void request(long n) {
        upstream.request(n);
    }

    // the rest omitted for brevity
}
于 2018-02-22T15:35:31.060 回答
0

是的,你必须做一些棘手的事情......

我会避免编写运算符,除非您非常确定自己在做什么?几乎所有事情都可以使用默认运算符来实现......

在 RxJava 中编写类似源代码 (fromEmitter) 或类似中间代码 (flatMap) 的运算符一直是一项艰巨的任务。有许多规则要遵守,许多情况要考虑,但同时,要构建性能良好的代码,需要采取许多(合法的)捷径。现在专门为 2.x 编写一个运算符比为 1.x 难 10 倍。如果您想利用所有高级的第 4 代功能,那么这甚至要难上 2-3 倍(总共要难 30 倍)。

有一些棘手的东西解释了:https ://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0

于 2018-02-22T14:53:58.440 回答