我正在开发一个新的响应式项目,其中正在进行大量文件处理 IO。如果我以命令式阻塞方式编写 IO 代码然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上是否足够?boundedElastic 池大小会限制并发操作的数量吗?
如果这不是正确的方法,您能否举例说明如何使用 Reactor 将字节写入文件?
我正在开发一个新的响应式项目,其中正在进行大量文件处理 IO。如果我以命令式阻塞方式编写 IO 代码然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上是否足够?boundedElastic 池大小会限制并发操作的数量吗?
如果这不是正确的方法,您能否举例说明如何使用 Reactor 将字节写入文件?
如果我以命令式阻塞方式编写 IO 代码然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上是否足够?
这在某种程度上归结为意见 - 但不,对于一个反应性的新建项目恕我直言,肯定不是理想的。调度程序非常适合在必须boundedElastic()
时与阻塞 IO 进行交互,但当存在真正的非阻塞解决方案时,它们并不是一个好的替代品。(有时这对于文件处理有点争议,因为这取决于底层系统是否有可能异步执行它 - 但现在通常这是可能的。)
在您的情况下,我会考虑包装AsynchronousFileChannel
在反应式发布者中。您需要为此使用create()
or push()
,然后显式调用sink
,但具体如何执行取决于您的用例。作为文件写入的“最简单情况”,您可以执行以下操作:
static Mono<Void> writeToFile(AsynchronousFileChannel channel, String content) {
return Mono.create(sink -> {
byte[] bytes = content.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
channel.write(buffer, 0, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
sink.success();
}
@Override
public void failed(Throwable exc, Object attachment) {
sink.error(exc);
}
});
});
}
可以在此处找到桥接这两个 API 的更彻底/全面的示例- 几乎可以肯定周围还有其他 API。