1

使用 RxJava3,给定 anObservable和 a Subject,我可以将 Subject 订阅到 Observable:

observable.subscribe(subject); // returns void, not a subscription

后来我的 Subject 对 Observable 不再感兴趣了,如何退订 Observable 呢?

4

1 回答 1

2

我认为最简单的选择是使用返回 a的重载subscribeDisposable,并让每个处理程序在您的 上调用适当的方法Subject,如下所示:

Disposable d = observable
    .subscribe(subject::onNext, subject::onError, subject::onComplete);

// Later
d.dispose();

您还可以创建一个DisposableObserver将所有消息转发到Subject, 并使用subscribeWith而不是subscribe,尽管它更冗长:

Disposable d = observable
.subscribeWith(new DisposableObserver<Integer>() {
         @Override public void onStart() {
         }
         @Override public void onNext(Integer t) {
             subject.onNext(t);
         }
         @Override public void onError(Throwable t) {
             subject.onError(t);
         }
         @Override public void onComplete() {
             subject.onComplete();
         }
     });

我不知道有任何更简洁的选项, RxJava 错误跟踪器的这个问题似乎支持了这一点,尽管它是针对 RxJava2 的。

于 2020-07-14T14:42:10.897 回答