5

我想使用同步块作为 flatMap 的来源。但是我需要使用这个构造来处理(方法 processItem),而不仅仅是在创建内部源时。

这个 Observable 每 5 分钟调用一次(例如 Observable.interval)

Observable.fromIterable(getSourceData())
.flatMapCompletable(item -> {
  synchronized(LockManager.getInstance().getLockObject(item.id)){
    return processItem(item)
          .subscribeOn(Schedulers.io());
  }
})

我的 processesItem 方法如下所示:

public Completable processItem(Item item){
  return mApiSource.getItemById(item.id)
    .flatMap(item -> 
      mItemRepository.replace(item)
    )
    .toCompletable();
}

两种内部方法都返回 Single。

它是从服务器定期更新的方法的一部分。我需要使用从其他项目类调用的用于修改项目(更新、删除)的方法(与项目同步)来序列化 processItem 方法调用(从服务器定期同步项目)。

主要问题是定期更新可以重写新更新的项目。

实际上我使用这个解决方案:

更新新项目的链:

public Completable updateItem(Item item){
  return Completable.fromAction(() -> {
        synchronized(LockManager.getInstance().getLockObject(item.id)){
          mApiSource.update(item)
          .flatMap(item -> 
            mItemRepository.replace(item)
          )
          .toCompletable()
          .blockingAwait();
        }
      })
      .subscribeOn(Schedulers.io())
}

定期更新链:

Observable.fromIterable(getSourceData())
    .flatMapCompletable(item -> {
      Completable.fromAction(() ->{
        synchronized(LockManager.getInstance().getLockObject(item.id)){
          processItem(item).blockingAwait();
        }
      })
      .subscribeOn(Schedulers.io())
    });

我知道这不是明确的 RxJava 解决方案。

你知道这个问题的更好解决方案吗?

4

0 回答 0