4

I have an Observable that emits ticks every second:

Observable.interval(0, 1, TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

I'd like to pause this Observable so it stops emitting numbers, and resume it on demand.

There are some gotchas:

  • according to the Observable Javadoc, interval operator doesn't support backpressure
  • the RxJava wiki about backpressure has a section about Callstack blocking as a flow-control alternative to backpressure:

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.

Is there a way to pause an interval Observable? Or should I implement my own 'ticking' Observable with some backpressure support?

4

1 回答 1

8

有很多方法可以做到这一点。例如,您仍然可以使用interval()和维护两个附加状态:例如布尔标志“暂停”和计数器。

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial, interval, unit, scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

然后你只需在某处调用 paused.set(true/false) 即可暂停/恢复

编辑 2016-06-04

上面的解决方案有一个小问题。如果我们多次重用 observable 实例,它将从最后一次取消订阅时的值开始。例如:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

虽然 list1 应该是 [0,1,2,3,4],但 list2 实际上是 [5,6,7,8,9]。如果不希望出现上述行为,则必须将 observable 设为无状态。这可以通过 scan() 运算符来实现。修改后的版本可能如下所示:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
      final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

或者,如果您不希望依赖于 Java 8 和 lambda,您可以使用 Java 6+ 兼容代码执行以下操作:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

于 2016-03-04T20:03:30.993 回答