0

我正在使用以下 Observable 来执行常规任务。Observable 在类首次加载到内存时启动,然后定期执行我的代码。由于它是单个 Observable,因此可以保证 [从我的测试中看到的假设] 代码永远不会再次启动并并行处理,以防它运行的时间长于间隔。

private static Subscription subscription = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code

                return Observable.just(null);
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe();

但这也有一个缺点,即 rxjava 会累积延迟的排放,并在延迟迭代完成后快速启动它们。示例:如果定时器被编程为每 1000 毫秒迭代一次,并且迭代 n 需要 5000 毫秒,则迭代 n+1、n+2、n+3 等按顺序启动,但一个接一个,并且不遵守定时器间隔.

没那么糟糕,但真正的问题是当 Android 休眠几个小时时会发生什么。因为一旦设备唤醒,rxjava 会以快速顺序启动所有错过的迭代,这会对性能造成相当大的影响。

我如何告诉 rxjava 忘记错过的迭代?如果迭代需要更长的时间,我希望计时器在迭代完成时启动,或者我想放弃错过的迭代并在到期时开始下一次迭代。我尝试使用 sample() 和其他过滤器,但不知何故并没有给我想要的效果,或者我不知道如何正确应用它们。

请注意,我不想为每次迭代创建一个新的 Observable(我可以为此使用 zip),因为我想确保代码不是从多个线程执行的。

4

1 回答 1

2

当您的 Android 设备处于睡眠模式时,不要要求 Rx 跳过某些事件。但是停止接收!

当你订阅一个流时,你有一个处理程序来取消订阅这个流。

Subscription subscription = Observable.timer(1, SECONDS).subscribe();

OnPause()您的活动方法中,您可以通过调用unsubscribe()订阅上的方法来停止流。

@Override
public void onPause() {
    subscription.unsubscribe();
}

onResume()您的活动方法中,您可以再次订阅您的信息流

@Override
public void onResume() {
   subscription = Observable.timer(1, SECONDS).subscribe();
}
于 2014-12-11T13:12:23.640 回答