我是响应式编程的新手,我正在阅读Josh Long 的 Reactive Spring。
在第 4 章中,他介绍了一个小程序来展示非阻塞文件读取的实际工作原理。
package rsb.io;
import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Log4j2
class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> {
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private int bytesRead;
private long position;
private AsynchronousFileChannel fileChannel;
private Consumer<Bytes> consumer;
private Runnable finished;
public void read(File file, Consumer<Bytes> c, Runnable finished) throws IOException
{
this.consumer = c;
this.finished = finished;
Path path = file.toPath();
this.fileChannel = AsynchronousFileChannel.open(path,
Collections.singleton(StandardOpenOption.READ), this.executorService);
ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE);
this.fileChannel.read(buffer, position, buffer, this);
while (this.bytesRead > 0) {
this.position = this.position + this.bytesRead;
this.fileChannel.read(buffer, this.position, buffer, this);
}
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
this.bytesRead = result;
if (this.bytesRead < 0) {
this.finished.run();
return;
}
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
consumer.accept(Bytes.from(data, data.length));
buffer.clear();
this.position = this.position + this.bytesRead;
this.fileChannel.read(buffer, this.position, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.error(exc);
}
}
关于这段代码实际上是如何工作的,我有很多问题:
- 为什么我们需要一个单独的线程池?(
Executors.newFixedThreadPool(10)
) - 读取 IO 操作是否真的发生在线程池中的线程上?如果是这样,那么我旋转线程和在那里阅读有什么区别?如果非阻塞只是使用不同的线程,那我为什么需要 NIO 或非阻塞 API 呢?
- 如果执行实际读取的线程正在等待来自磁盘的数据,那么该线程会发生什么?会一直阻塞吗?
- 同样,如果线程没有被阻塞,那么为什么不让一个线程来完成所有工作呢?为什么要启动多个线程?
我知道这些问题一定是非常基本的,但我还没有得到任何一个具体的答案。