具体来说,我正在寻找的行为是这样的:读取操作同时发生,并且将在所有挂起的写入操作完成后执行。写操作总是等到所有其他读/写操作完成。关闭操作总是等到所有其他读/写操作完成。换句话说,这些操作应该排队。
NIO FileLocks 的官方文档没有指定这种行为。事实上,它指出:
文件锁代表整个 Java 虚拟机持有。它们不适用于控制同一虚拟机内的多个线程对文件的访问。
在提交新的 I/O 请求之前,我已经考虑过手动对所有请求进行排队并在所有未完成的 Futures 上调用 get() ,但我不知道这是否是一个好主意。
我怎样才能实现这种行为?
编辑:感谢 fge 的见解,我设法找到了解决问题的基本方法:
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ChannelAccessFactory {
public static final ExecutorService IO_THREADS = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private final Path file;
private final ReadWriteLock lock;
public ChannelAccessFactory (Path file){
this.file = file;
this.lock = new ReentrantReadWriteLock();
}
public ReadWriteLock getLock(){
return lock;
}
public ChannelAccess newAccess() throws Exception{
return new ChannelAccess(file, lock);
}
}
包装频道类:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
public class ChannelAccess implements AutoCloseable{
private final ReadWriteLock lock;
private final AsynchronousFileChannel channel;
protected ChannelAccess (Path file, ReadWriteLock lock) throws Exception{
this.lock = lock;
this.channel = AsynchronousFileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE);
}
public Future<Integer> read(final ByteBuffer buffer, final long position){
return ChannelAccessFactory.IO_THREADS.submit(new Callable<Integer>(){
@Override
public Integer call() throws InterruptedException, ExecutionException{
lock.readLock().lock();
try{
return channel.read(buffer, position).get();
}
finally {
lock.readLock().unlock();
}
}
});
}
public Future<Integer> write(final ByteBuffer buffer, final long position){
return ChannelAccessFactory.IO_THREADS.submit(new Callable<Integer>(){
@Override
public Integer call() throws InterruptedException, ExecutionException {
lock.writeLock().lock();
try{
return channel.write(buffer, position).get();
}
finally {
lock.writeLock().unlock();
}
}
});
}
public long size() throws Exception{
lock.readLock().lock();
try{
return channel.size();
}
finally{
lock.readLock().unlock();
}
}
@Override
public void close() {
lock.readLock().lock();
try{
channel.close();
}
catch (IOException e){}
finally{
lock.readLock().unlock();
}
}
}