我有一段代码,他的工作是更新本地缓存。此缓存更新有两个触发器:
- 以固定间隔
- 要求时
所以这是一个关于我如何做到这一点的基本示例。
forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
.merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData(); // operation that may take long
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
然后我有
public void forceReload() {
final CountDownLatch cdl = new CountDownLatch(1);
dataUpdates
.take(1)
.subscribe(
new Action1<Boolean>() {
@Override
public void call(Boolean b) {
cdl.countDown();
}
}
);
forceReloadEvents.onNext(-1L);
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
这可行,但问题是当我开始有多个并发调用时forceReload()
:将不会并发执行reloadData()
但元素将排队并且进程将循环重新加载数据,直到发送到的所有事件forceReloadEvents
都已被消耗,即使forceReload()
已经完成由于之前的事件发布了CountDownLatch
.
我想使用onBackPressureDrop
,但似乎没有诱发背压,也没有任何东西被丢弃。我想要的是某种强制背压的方法,以便合并了解一次只能处理一个元素,并且必须删除任何后续事件,直到当前执行完成。
我考虑过使用buffer
or throttleFirst
,但我不想强制每个事件之间有一个特定的时间,我宁愿根据重新加载缓存所需的时间进行自动缩放。您可以将其视为throttleFirst
直到reloadData
完成。