1

具体来说,我正在寻找的行为是这样的:读取操作同时发生,并且将在所有挂起的写入操作完成后执行。写操作总是等到所有其他读/写操作完成。关闭操作总是等到所有其他读/写操作完成。换句话说,这些操作应该排队。

NIO FileLocks 的官方文档没有指定这种行为。事实上,它指出:

文件锁代表整个 Java 虚拟机持有。它们不适用于控制同一虚拟机内的多个线程对文件的访问。

在提交新的 I/O 请求之前,我已经考虑过手动对所有请求进行排队并在所有未完成的 Futures 上调用 get() ,但我不知道这是否是一个好主意。

我怎样才能实现这种行为?

编辑:感谢 fge 的见解,我设法找到了解决问题的基本方法:

import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ChannelAccessFactory {

    public static final ExecutorService IO_THREADS = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private final Path file;
    private final ReadWriteLock lock;

    public ChannelAccessFactory (Path file){
        this.file = file;
        this.lock = new ReentrantReadWriteLock();
    }

    public ReadWriteLock getLock(){
        return lock;
    }

    public ChannelAccess newAccess() throws Exception{
        return new ChannelAccess(file, lock);
    }

}

包装频道类:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;

public class ChannelAccess implements AutoCloseable{

    private final ReadWriteLock lock;
    private final AsynchronousFileChannel channel;

    protected ChannelAccess (Path file, ReadWriteLock lock) throws Exception{
        this.lock = lock;
        this.channel = AsynchronousFileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE);
    }

    public Future<Integer> read(final ByteBuffer buffer, final long position){
        return ChannelAccessFactory.IO_THREADS.submit(new Callable<Integer>(){
            @Override
            public Integer call() throws InterruptedException, ExecutionException{
                lock.readLock().lock();
                try{
                    return channel.read(buffer, position).get();
                }
                finally {
                    lock.readLock().unlock();
                }
            }
        });
    }

    public Future<Integer> write(final ByteBuffer buffer, final long position){
        return ChannelAccessFactory.IO_THREADS.submit(new Callable<Integer>(){
            @Override
            public Integer call() throws InterruptedException, ExecutionException {
                lock.writeLock().lock();
                try{
                    return channel.write(buffer, position).get();
                }
                finally {
                    lock.writeLock().unlock();
                }
            }
        });
    }

    public long size() throws Exception{
        lock.readLock().lock();
        try{
            return channel.size();
        }
        finally{
            lock.readLock().unlock();
        }
    }

    @Override
    public void close() {
        lock.readLock().lock();
        try{
            channel.close();
        }
        catch (IOException e){}
        finally{
            lock.readLock().unlock();
        }
    }
}
4

1 回答 1

4

使用ReentrantReadWriteLock. 它允许许多并发读者,但只允许一个作者。

它不是一个“纯”的 NIO 解决方案,但它是一个以您想要的方式运行的原语。

当心:使用这样的锁来避免死锁:

rwlock.readLock().lock();
try {
    // do stuff
} finally {
    rwlock.readLock().unlock();
}
于 2013-01-19T01:05:08.627 回答