1

我正在开发一个新的响应式项目,其中正在进行大量文件处理 IO。如果我以命令式阻塞方式编写 IO 代码然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上是否足够?boundedElastic 池大小会限制并发操作的数量吗?

如果这不是正确的方法,您能否举例说明如何使用 Reactor 将字节写入文件?

4

1 回答 1

1

如果我以命令式阻塞方式编写 IO 代码然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上是否足够?

这在某种程度上归结为意见 - 但不,对于一个反应性的新建项目恕我直言,肯定不是理想的。调度程序非常适合在必须boundedElastic()时与阻塞 IO 进行交互,但当存在真正的非阻塞解决方案时,它们并不是一个好的替代品。(有时这对于文件处理有点争议,因为这取决于底层系统是否有可能异步执行它 - 但现在通常这是可能的。)

在您的情况下,我会考虑包装AsynchronousFileChannel在反应式发布者中。您需要为此使用create()or push(),然后显式调用sink,但具体如何执行取决于您的用例。作为文件写入的“最简单情况”,您可以执行以下操作:

static Mono<Void> writeToFile(AsynchronousFileChannel channel, String content) {
    return Mono.create(sink -> {
        byte[] bytes = content.getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
        buffer.put(bytes);
        buffer.flip();

        channel.write(buffer, 0, null, new CompletionHandler<>() {
            @Override
            public void completed(Integer result, Object attachment) {
                sink.success();
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                sink.error(exc);
            }
        });
    });
}

可以在此处找到桥接这两个 API 的更彻底/全面的示例- 几乎可以肯定周围还有其他 API。

于 2020-06-04T09:59:24.033 回答