简而言之,是否有任何解决方案可以解决 RxJava 中的背压,而无需借助丢弃项目、序列化操作或无界缓冲?
考虑以下任务作为何时有用的示例。
- 将数据从磁盘读入内存
- 压缩数据
- 通过网络传输压缩数据
直接的方法是在单个后台线程上按顺序执行所有任务,如下所示:
observeBlocksOfFileContents(file).
.subscribeOn(backgroundScheduler)
.map(compressBlock)
.subscribe(transmitBlock);
虽然这没有问题,但从性能的角度来看,它是次优的,因为运行时是所有三个操作的总和,而不是并行运行时它们中的最大值:
observeBlocksOfFileContents(file).
.subscribeOn(diskScheduler)
.observeOn(cpuScheduler)
.map(compressBlock)
.observeOn(networkScheduler)
.subscribe(transmitBlock);
但是,如果从磁盘读取数据的速度快于压缩和传输的速度,这可能会由于背压而失败。通常的背压解决方案是不可取的,原因如下:
- 丢件:文件必须完整传输,不得遗漏
- 在单线程上序列化:流水线的性能提升丢失了
- 调用栈阻塞:在 RxJava 中不支持
- 增加observeOn buffers:内存消耗可能会变成文件大小的几倍
- 在没有 MissingBackpressureException 的情况下重新实现 observeOn:大量工作并破坏了流畅的 API
还有其他解决方案吗?还是这根本不适合 ReactiveX 可观察模型?