1

简而言之,是否有任何解决方案可以解决 RxJava 中的背压,而无需借助丢弃项目、序列化操作或无界缓冲?

考虑以下任务作为何时有用的示例。

  1. 将数据从磁盘读入内存
  2. 压缩数据
  3. 通过网络传输压缩数据

直接的方法是在单个后台线程上按顺序执行所有任务,如下所示:

observeBlocksOfFileContents(file).
    .subscribeOn(backgroundScheduler)
    .map(compressBlock)
    .subscribe(transmitBlock);

虽然这没有问题,但从性能的角度来看,它是次优的,因为运行时是所有三个操作的总和,而不是并行运行时它们中的最大值:

observeBlocksOfFileContents(file).
    .subscribeOn(diskScheduler)
    .observeOn(cpuScheduler)
    .map(compressBlock)
    .observeOn(networkScheduler)
    .subscribe(transmitBlock);

但是,如果从磁盘读取数据的速度快于压缩和传输的速度,这可能会由于背压而失败。通常的背压解决方案是不可取的,原因如下:

  1. 丢件:文件必须完整传输,不得遗漏
  2. 在单线程上序列化:流水线的性能提升丢失了
  3. 调用栈阻塞:在 RxJava 中不支持
  4. 增加observeOn buffers:内存消耗可能会变成文件大小的几倍
  5. 在没有 MissingBackpressureException 的情况下重新实现 observeOn:大量工作并破坏了流畅的 API

还有其他解决方案吗?还是这根本不适合 ReactiveX 可观察模型?

4

1 回答 1

1

6)实现observeBlocksOfFileContents,使其支持背压。

文件系统已经是基于拉取的(InputStream.read() 在你想要它而不是扔给你的时候发生)所以考虑一个合理的块大小并在每个请求中读取它:

Observable.create(SyncOnSubscribe.createStateful(
    () -> new FileInputStream("file.dat")
    (in, out) -> {
        byte[] buf = new byte[4096];
        int r = in.read(buf);
        if (r < 0) {
            out.onCompleted();
        } else {
            if (r == buf.length) {
                out.onNext(buf);
            } else {
                byte[] buf2 = new byte[r];
                System.arraycopy(buf, 0, buf2, 0, r);
                out.onNext(buf2);
            }
        }

    }, 
    in -> in.close()
));

(为简洁起见,省略了尝试捕获。)

于 2016-07-29T07:00:29.450 回答