我有一个Flowable,它接收文件系统事件(例如:文件更新,使用inotify),在文件系统编辑完成时缓冲文件,然后将这些事件交给处理。
这种在不同线程中发生的下游处理可能会更新已更新的文件。这样就创建了新的文件系统事件。
fileUpdateFlowable <- Flowable<FileEvent>
.onBackPressureDrop(...)
.flatMap(fileEvent ->
Flowable.just(fileEvent)
.map(fileEvent -> {
if(fileIsStable) return file;
else throw new UnstableFileException();
})
.retryWhen(f -> {
f.flatMap(error ->
if(error instanceof UnstableFileException) return Flowable.timer(delay);
else throw error;
)
})
如何使“处理”尊重 Flowable 顶部的状态?文件的缓冲意味着如果出现以下情况,我可能会遇到背压:
- 下游处理的线程过多。
- 线程碰巧编辑了很多文件。
- 文件编辑的类型碰巧会创建更多的文件系统事件(这取决于对文件执行的操作)。
我正在寻找一种处理线程在备份大量文件系统事件时延迟写入的方法。
我考虑在与Scheduler
文件系统事件处理相同的情况下运行编辑,但是缓冲区当前的工作方式是retryWhen
使用Flowable.timer
- 所以没有明显的方法来延迟这项工作。