1

我目前正在开发一个需要随机访问许多(60k-100k)相对大文件的应用程序。由于打开和关闭流是一项相当昂贵的操作,因此我更愿意将最大文件的 FileChannels 保持打开,直到不再需要它们为止。

问题是,由于 Java 7 的 try-with 语句没有涵盖这种行为,因此我需要手动关闭所有 FileChannel。但这变得越来越复杂,因为可以在整个软件中同时访问相同的文件。

我已经实现了一个 ChannelPool 类,它可以跟踪每个注册路径的打开的 FileChannel 实例。然后可以发出 ChannelPool 以关闭那些 Path 仅在特定时间间隔内被池本身弱引用的通道。我更喜欢事件侦听器方法,但我也不想听 GC。

来自 Apache Commons的FileChannelPool没有解决我的问题,因为通道仍然需要手动关闭。

这个问题有更优雅的解决方案吗?如果没有,如何改进我的实施?

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

public class ChannelPool {

    private static final ChannelPool defaultInstance = new ChannelPool();

    private final ConcurrentHashMap<String, ChannelRef> channels;
    private final Timer timer;

    private ChannelPool(){
        channels = new ConcurrentHashMap<>();
        timer = new Timer();
    }

    public static ChannelPool getDefault(){
        return defaultInstance;
    }

    public void initCleanUp(){
        // wait 2 seconds then repeat clean-up every 10 seconds.
        timer.schedule(new CleanUpTask(this), 2000, 10000);
    }

    public void shutDown(){
        // must be called manually.
        timer.cancel();
        closeAll();
    }

    public FileChannel getChannel(Path path){
        ChannelRef cref = channels.get(path.toString());
        System.out.println("getChannel called " + channels.size());

        if (cref == null){
            cref = ChannelRef.newInstance(path);
            if (cref == null){
                // failed to open channel
                return null;
            }
            ChannelRef oldRef = channels.putIfAbsent(path.toString(), cref);
            if (oldRef != null){
                try{
                    // close new channel and let GC dispose of it
                    cref.channel().close();
                    System.out.println("redundant channel closed");
                }
                catch (IOException ex) {}
                cref = oldRef;
            }
        }
        return cref.channel();
    }

    private void remove(String str) {   
        ChannelRef ref = channels.remove(str);
        if (ref != null){
            try {
                ref.channel().close();
                System.out.println("old channel closed");
            }
            catch (IOException ex) {}
        }
    }

    private void closeAll() {
        for (Map.Entry<String, ChannelRef> e : channels.entrySet()){
            remove(e.getKey());
        }
    }

    private void maintain() {
        // close channels for derefenced paths
        for (Map.Entry<String, ChannelRef> e : channels.entrySet()){
            ChannelRef ref = e.getValue();
            if (ref != null){
                Path p = ref.pathRef().get();
                if (p == null){
                    // gc'd
                    remove(e.getKey());
                }
            }
        }
    }

    private static class ChannelRef{

        private FileChannel channel;
        private WeakReference<Path> ref;

        private ChannelRef(FileChannel channel, WeakReference<Path> ref) {
            this.channel = channel;
            this.ref = ref;
        }

        private static ChannelRef newInstance(Path path) {
            FileChannel fc;
            try {
                fc = FileChannel.open(path, StandardOpenOption.READ);
            }
            catch (IOException ex) {
                return null;
            }
            return new ChannelRef(fc, new WeakReference<>(path));

        }

        private FileChannel channel() {
            return channel;
        }

        private WeakReference<Path> pathRef() {
            return ref;
        }
    }

    private static class CleanUpTask extends TimerTask {

        private ChannelPool pool;

        private CleanUpTask(ChannelPool pool){
            super();
            this.pool = pool;
        }

        @Override
        public void run() {
            pool.maintain();
            pool.printState();
        }
    }

    private void printState(){
        System.out.println("Clean up performed. " + channels.size() + " channels remain. -- " + System.currentTimeMillis());
        for (Map.Entry<String, ChannelRef> e : channels.entrySet()){
            ChannelRef cref = e.getValue();
            String out = "open: " + cref.channel().isOpen() + " - " + cref.channel().toString();
            System.out.println(out);
        }
    }

}

编辑:感谢fge的回答,我现在完全有我需要的东西。谢谢!

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;

public class Channels {

    private static final LoadingCache<Path, FileChannel> channelCache = 
            CacheBuilder.newBuilder()
            .weakKeys()
            .removalListener(
                new RemovalListener<Path, FileChannel>(){
                    @Override
                    public void onRemoval(RemovalNotification<Path, FileChannel> removal) {
                        FileChannel fc = removal.getValue();
                        try {
                            fc.close();
                        }
                        catch (IOException ex) {}
                    }
                }
            )
            .build(
                new CacheLoader<Path, FileChannel>() {
                    @Override
                    public FileChannel load(Path path) throws IOException {
                        return FileChannel.open(path, StandardOpenOption.READ);
                    }
                }
            );

    public static FileChannel get(Path path){
        try {
            return channelCache.get(path);
        }
        catch (ExecutionException ex){}
        return null;
    }
}
4

1 回答 1

1

看看这里:

http://code.google.com/p/guava-libraries/wiki/CachesExplained

您可以将 aLoadingCache与删除侦听器一起使用,该侦听器会在通道过期时为您关闭,您可以在访问或写入后指定过期。

于 2013-01-06T20:39:44.730 回答