使用 RxJava3,给定 anObservable
和 a Subject
,我可以将 Subject 订阅到 Observable:
observable.subscribe(subject); // returns void, not a subscription
后来我的 Subject 对 Observable 不再感兴趣了,如何退订 Observable 呢?
我认为最简单的选择是使用返回 a的重载subscribe
Disposable
,并让每个处理程序在您的 上调用适当的方法Subject
,如下所示:
Disposable d = observable
.subscribe(subject::onNext, subject::onError, subject::onComplete);
// Later
d.dispose();
您还可以创建一个DisposableObserver
将所有消息转发到Subject
, 并使用subscribeWith
而不是subscribe
,尽管它更冗长:
Disposable d = observable
.subscribeWith(new DisposableObserver<Integer>() {
@Override public void onStart() {
}
@Override public void onNext(Integer t) {
subject.onNext(t);
}
@Override public void onError(Throwable t) {
subject.onError(t);
}
@Override public void onComplete() {
subject.onComplete();
}
});
我不知道有任何更简洁的选项, RxJava 错误跟踪器的这个问题似乎支持了这一点,尽管它是针对 RxJava2 的。