我想使用同步块作为 flatMap 的来源。但是我需要使用这个构造来处理(方法 processItem),而不仅仅是在创建内部源时。
这个 Observable 每 5 分钟调用一次(例如 Observable.interval)
Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
synchronized(LockManager.getInstance().getLockObject(item.id)){
return processItem(item)
.subscribeOn(Schedulers.io());
}
})
我的 processesItem 方法如下所示:
public Completable processItem(Item item){
return mApiSource.getItemById(item.id)
.flatMap(item ->
mItemRepository.replace(item)
)
.toCompletable();
}
两种内部方法都返回 Single。
它是从服务器定期更新的方法的一部分。我需要使用从其他项目类调用的用于修改项目(更新、删除)的方法(与项目同步)来序列化 processItem 方法调用(从服务器定期同步项目)。
主要问题是定期更新可以重写新更新的项目。
实际上我使用这个解决方案:
更新新项目的链:
public Completable updateItem(Item item){
return Completable.fromAction(() -> {
synchronized(LockManager.getInstance().getLockObject(item.id)){
mApiSource.update(item)
.flatMap(item ->
mItemRepository.replace(item)
)
.toCompletable()
.blockingAwait();
}
})
.subscribeOn(Schedulers.io())
}
定期更新链:
Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
Completable.fromAction(() ->{
synchronized(LockManager.getInstance().getLockObject(item.id)){
processItem(item).blockingAwait();
}
})
.subscribeOn(Schedulers.io())
});
我知道这不是明确的 RxJava 解决方案。
你知道这个问题的更好解决方案吗?