11

我有以下 Hot Observable:

hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
                          .map((t) -> getCurrentTimeInMillis()))

但是,我找不到阻止它的好方法。我能够使用标志 ( ) 部分解决takeWhileboolean问题runTimer

Observable.interval(0L, 1L, TimeUnit.SECONDS)
          .takeWhile((t) -> runTimer)
          .map((t) -> getCurrentTimeInMillis()))

不过,我不喜欢这种方法有两件事:

  1. 我必须保留旗帜runTimer,这是我不想要的。
  2. 一旦runTimer变为false,Observable 就完成了,这意味着如果我想再次发出,我需要创建一个新的 Observable。我不想要那个。我只希望 Observable 停止发射项目,直到我告诉它重新开始。

我希望有这样的事情:

hotObservable.stop();
hotObservable.resume();

这样我就不需要保留任何标志,并且 observable 始终处于活动状态(尽管它可能不会发出事件)。

我怎样才能做到这一点?

4

2 回答 2

9

一种可能的方法使用 BehaviorSubject 和 switchMap:

BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
    if (on) {
        return Observable.interval(0L, 1L, TimeUnit.SECONDS);
    } else {
        return Observable.never();
    }
}).map((t) -> getCurrentTimeInMillis());

通过向主题发送布尔值,可以控制 observable 的输出。subject.onNext(true)将导致使用该主题创建的任何可观察对象开始发射值。subject.onNext(false)禁用该流。

switchMap它被关闭时,它负责处理底层的 observables。它还用于distinctUntilChanged确保它不会进行不必要的切换。

于 2016-12-10T02:55:53.633 回答
1

Mybe你可以用这个

Observable.interval(3, TimeUnit.SECONDS)
        .takeUntil(stop)
        .subscribe(new MyObserver());

Thread.sleep(10000);
stop.onNext(-1);
于 2017-06-11T05:02:11.287 回答