0

我有一个包含多个订阅者的发布主题:

这是课程:

class Real {

    private val publisher: PublishSubject<String> = PublishSubject.create()

    fun doPublish() {
        for (i in 1 until 20) {
            publisher.onNext("$i Hello")
        }
        publisher.onComplete()
    }

    fun doSubscribe() {
        publisher.subscribe {
            println("Subscriber1 $it")
        }

        publisher.subscribe {
            println("Subscriber2 $it")
        }

        publisher.subscribe {
            println("Subscriber3 $it")
        }

    }
}

我打电话doSubscribe()之前我打电话doPublish() 输出如下:

 Task :Main.main()
Subscriber1 1 Hello
Subscriber2 1 Hello
Subscriber3 1 Hello
Subscriber1 2 Hello
Subscriber2 2 Hello
Subscriber3 2 Hello
Subscriber1 3 Hello
Subscriber2 3 Hello
Subscriber3 3 Hello
Subscriber1 4 Hello
Subscriber2 4 Hello
Subscriber3 4 Hello
Subscriber1 5 Hello
Subscriber2 5 Hello
Subscriber3 5 Hello
Subscriber1 6 Hello
Subscriber2 6 Hello
Subscriber3 6 Hello
Subscriber1 7 Hello
Subscriber2 7 Hello
Subscriber3 7 Hello
Subscriber1 8 Hello
Subscriber2 8 Hello
Subscriber3 8 Hello
Subscriber1 9 Hello
Subscriber2 9 Hello
Subscriber3 9 Hello
Subscriber1 10 Hello
Subscriber2 10 Hello
Subscriber3 10 Hello
Subscriber1 11 Hello
Subscriber2 11 Hello
Subscriber3 11 Hello
Subscriber1 12 Hello
Subscriber2 12 Hello
Subscriber3 12 Hello
Subscriber1 13 Hello
Subscriber2 13 Hello
Subscriber3 13 Hello
Subscriber1 14 Hello
Subscriber2 14 Hello
Subscriber3 14 Hello
Subscriber1 15 Hello
Subscriber2 15 Hello
Subscriber3 15 Hello
Subscriber1 16 Hello
Subscriber2 16 Hello
Subscriber3 16 Hello
Subscriber1 17 Hello
Subscriber2 17 Hello
Subscriber3 17 Hello
Subscriber1 18 Hello
Subscriber2 18 Hello
Subscriber3 18 Hello
Subscriber1 19 Hello
Subscriber2 19 Hello
Subscriber3 19 Hello

根据上述程序,第一个订阅者首先收到事件,然后是第二个和第三个,这完全按照订阅的顺序。

这个执行顺序有保证吗?因为我无法找到有关此的相关文档。

4

1 回答 1

0

请查看PublishSubject实现:

订阅会发生什么?

通过 add-method (b[n] = ps;) 创建 PublishDisposable 并将其添加到“订阅者”数组中

现在 PublishSubject 有一个订阅者数组,它尊重插入顺序

@Override
protected void subscribeActual(Observer<? super T> t) {
    PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
    t.onSubscribe(ps);
    if (add(ps)) {
        // if cancellation happened while a successful add, the remove() didn't work
        // so we need to do it again
        if (ps.isDisposed()) {
            remove(ps);
        }
    } else {
        ...
    }
}

boolean add(PublishDisposable<T> ps) {
    for (;;) {
        PublishDisposable<T>[] a = subscribers.get();
        if (a == TERMINATED) {
            return false;
        }

        int n = a.length;
        @SuppressWarnings("unchecked")
        PublishDisposable<T>[] b = new PublishDisposable[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = ps;

        if (subscribers.compareAndSet(a, b)) {
            return true;
        }
    }
}

源现在通过 onNext 发出一个新值。onNext 方法显示了订阅者的 onNext-invocation 调用。订阅者数组从 0...n 开始迭代。因此订阅者是按插入顺序调用的,因为根据合同 onNext 必须连续调用。

Observables 必须串行(而不是并行)向观察者发出通知。他们可能会从不同的线程发出这些通知,但通知之间必须有正式的先发生关系。(http://reactivex.io/documentation/contract.html

@Override
public void onNext(T t) {
    ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
    for (PublishDisposable<T> pd : subscribers.get()) {
        pd.onNext(t);
    }
}
于 2020-05-06T08:48:23.013 回答