0

我有一个PublishSubject<Event>.

在每个新事件上,我触发一个数据库查询(一些本地缓存的数据),然后获取结果并尝试通过HTTP请求将其发布到服务器,当该服务器回复200,我进行另一个数据库查询以删除刚刚发送的行.

这是通过链接大致如下完成的:

subject
  .toSerialized()
  .flatMapMaybe { getCachedData() }
  .flatMap { uploadData() }
  .flatMapCompletable { cleanCache() }
  .subscribe()

在某些条件下,主题可能会发出两个快速事件,假设间隔为 10 毫秒。

问题是第二次发射的getCachedData ()在第一次发射的 getCachedData() 完成后立即关闭,即在cleanCache()有机会在第二次发射之前清理数据库之前。

我想以某种方式将这些flatMaps组合到一个观察者中,以便在整个链完成之前,主体不会进行新的排放,最好没有任何手工制作的信号量。

我在单个线程池调度程序上执行subscribeOn() ,它只对每个flatMap内的调用进行排序。

我看到了一些将toSerialized()添加到主题的建议,但现在我认为它与链的工作方式无关。

我看到还有lift()compose()运算符。我试图将所有flatMaps放在后者中,这并没有改变行为。前者我仍然想知道。

4

1 回答 1

1

将它们放在concatMapX子流中:

subject
.concatMapCompletable {
    getCachedData()
    .flatMap { uploadData() }
    .flatMapCompletable { cleanCache() }
}

我看到了一些将 toSerialized() 添加到主题的建议,但现在我认为它与链的工作方式无关

它对您的流程没有实际影响,除非您实际驱动Subject它和来自多个线程的返回。

于 2021-02-05T12:21:50.440 回答