0

我是 RxJava 的新手,如果我理解正确,Observer那么Disposable它可以在已经调用onSubscribe的情况下手动停止处理。 我创建了以下代码:dispose()

@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
      private Disposable d;

      @Override
      public void onSubscribe(@NonNull Disposable d) {
           this.d = d;
      }

      @Override
      public void onNext(@NonNull Long aLong) {
           if(!d.isDisposed()) {
              System.out.println("Number onNext = " + aLong);
           }
      }

       @Override
       public void onError(@NonNull Throwable e) {

       }

       @Override
       public void onComplete() {
           System.out.println("completed");
       }
 });

但我不知道如何调用dispose()该订阅。作为参数subscribe传递返回并且不接受 my没有编译错误。ObservervoidsubscribeWithObserver

这应该如何工作?我在这里有什么误解?

4

2 回答 2

1

JavaDocs有一个简单的Observable例子:

Disposable d = Observable.just("Hello world!")
     .delay(1, TimeUnit.SECONDS)
     .subscribeWith(new DisposableObserver<String>() {
         @Override public void onStart() {
             System.out.println("Start!");
         }
         @Override public void onNext(String t) {
             System.out.println(t);
         }
         @Override public void onError(Throwable t) {
             t.printStackTrace();
         }
         @Override public void onComplete() {
             System.out.println("Done!");
         }
     });

 Thread.sleep(500);
 // the sequence can now be disposed via dispose()
 d.dispose();

编辑

以下示例是Disposable退出该方法的onSubscribe方法,但通常不推荐:

// field in the owner class
Disposable disposable;

public void doReactive() {
    Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
    src.subscribe(new Observer<Long>() {

        @Override
        public void onSubscribe(@NonNull Disposable d) {
           disposable = d;
        }

        // ...
    });
}

public void cleanup() {
   if (disposable != null) {
       disposable.dispose();
       disposable = null;
   }
}

或者

SerialDisposable sd = new SerialDisposable();

Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
    src.subscribe(new Observer<Long>() {

        @Override
        public void onSubscribe(@NonNull Disposable d) {
           sd.set(d);
        }

        // ...
    });

// ...

sd.dispose();
于 2021-06-15T14:36:05.257 回答
0

您可以使用 DisposableObserver ,当您完成观察时可以轻松处理它。

@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
    src.subscribe(new DisposableObserver<Long>() {
        @Override
        public void onNext(@NotNull Long aLong) {
            //Do anything you want to do..
            dispose();
        }

        @Override
        public void onError(@NotNull Throwable e) {
            //Handle the errors here..
            dispose();
        }

        @Override
        public void onComplete() {
            dispose();
        }
    });

您还可以使用 CompositeDisposable 一次处理多个观察者,有关更多详细信息,请查看此内容。

https://www.tutorialspoint.com/rxjava/rxjava_compositedisposable.htm

于 2021-06-15T10:12:25.043 回答