我是 RxJava 的新手,如果我理解正确,Observer
那么Disposable
它可以在已经调用onSubscribe
的情况下手动停止处理。
我创建了以下代码:dispose()
@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
private Disposable d;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.d = d;
}
@Override
public void onNext(@NonNull Long aLong) {
if(!d.isDisposed()) {
System.out.println("Number onNext = " + aLong);
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("completed");
}
});
但我不知道如何调用dispose()
该订阅。作为参数subscribe
传递返回并且不接受 my没有编译错误。Observer
void
subscribeWith
Observer
这应该如何工作?我在这里有什么误解?