我有一个PublishSubject<Event>
.
在每个新事件上,我触发一个数据库查询(一些本地缓存的数据),然后获取结果并尝试通过HTTP请求将其发布到服务器,当该服务器回复200时,我进行另一个数据库查询以删除刚刚发送的行.
这是通过链接大致如下完成的:
subject
.toSerialized()
.flatMapMaybe { getCachedData() }
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
.subscribe()
在某些条件下,主题可能会发出两个快速事件,假设间隔为 10 毫秒。
问题是第二次发射的getCachedData ()在第一次发射的 getCachedData() 完成后立即关闭,即在cleanCache()有机会在第二次发射之前清理数据库之前。
我想以某种方式将这些flatMaps组合到一个观察者中,以便在整个链完成之前,主体不会进行新的排放,最好没有任何手工制作的信号量。
我在单个线程池调度程序上执行subscribeOn() ,它只对每个flatMap内的调用进行排序。
我看到了一些将toSerialized()添加到主题的建议,但现在我认为它与链的工作方式无关。
我看到还有lift()和compose()运算符。我试图将所有flatMaps放在后者中,这并没有改变行为。前者我仍然想知道。