14

我正在使用 WatchService 来观察目录的变化,特别是在目录中创建新文件。下面是我的代码 -

package watcher;

import java.nio.file.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import java.io.*;


public class Watch {
    public static void main(String[] args) throws IOException {
        Path dir = Paths.get("c:\\mk\\");
        WatchService service = FileSystems.getDefault().newWatchService();
        WatchKey key = dir.register(service, ENTRY_CREATE);

        System.out.println("Watching directory: "+dir.toString());
        for(;;){
            WatchKey key1;
            try {
                key1 = service.take();
            } catch (InterruptedException x) {
                break;
            }

            for (WatchEvent<?> event: key1.pollEvents()) {
                WatchEvent.Kind<?> kind = event.kind();

                if (kind == OVERFLOW) {
                    continue;
                }

                WatchEvent<Path> ev = (WatchEvent<Path>)event;
                Path filename = ev.context();
                Path child = dir.resolve(filename);
                System.out.println("New file: "+child.toString()+" created.");
                try{
                    FileInputStream in = new FileInputStream(child.toFile());
                    System.out.println("File opened for reading");
                    in.close();
                    System.out.println("File Closed");
                }catch(Exception x){
                    x.printStackTrace();
                }
            }

            boolean valid = key.reset();
            if (!valid) {
                break;
            }
        }
    }
}

当我在“mk”目录中创建文件时,我会收到通知。但是当我在这个目录中复制一些文件时,我在打开那个复制的文件时遇到了异常。

我的猜测是 Windows Copier 对话框仍然锁定了该文件,我无法打开该文件。所以基本上我想知道的是如何获得通知文件已被其他进程关闭。

上面代码的输出就像 -

Watching directory: c:\mk
New file: c:\mk\New Text Document (2).txt created.
File opened for reading
File Closed
New file: c:\mk\Config.class created.
java.io.FileNotFoundException: c:\mk\Config.class (The process cannot access the file because it is being used by another process)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at watcher.Watch.main(Watch.java:36)
New file: c:\mk\New Text Document (3).txt created.
File opened for reading
File Closed

文件“ New Text Document (2).txt”和“ New Text Document (3).txt”我已创建,但文件“ Config.class”我已从其他目录复制。

请帮助我。

4

2 回答 2

18

我通过实现算法得到了这个工作:观察线程将把文件名放在 BlockingQueue 中,其他线程将轮询这个队列,获取文件名,尝试几次打开文件。如果文件被打开,Windows Copier 已释放文件锁定,我们可以继续。因此,当其他线程发现文件已被解锁时,其他线程会将这个文件名放入已处理队列中,我的应用程序将从那里检索文件名。还有另一个线程在打开文件检查文件解锁时,如果运行时间长了解锁文件,我们可以把这个文件名放回BlockingQueue中处理其他文件名,前者可以稍后处理。

解决方案:希望这对其他人有帮助:

package dirwatch;

import java.nio.file.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import static java.nio.file.LinkOption.*;
import java.nio.file.attribute.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class WatchDir {
    private final WatchService watcher;
    private final Map<WatchKey,Path> keys;
    private final boolean recursive;
    private boolean trace = false;

    private BlockingQueue<String> fileProcessingQueue;

    //******* processedFileQueue **** will be used by other threads to retrive unlocked files.. so I have 
    // kept as public final
    public final BlockingQueue<String> processedFileQueue;
    private volatile boolean closeProcessingThread;
    private volatile boolean closeWatcherThread;


    private void processFiles(){
        System.out.println("DirWatchProcessingThread Started");
        String fileName;
        outerLoop: while(!closeProcessingThread || !fileProcessingQueue.isEmpty()){
            try{
                fileName = fileProcessingQueue.poll(1000, TimeUnit.MILLISECONDS);
            }catch(InterruptedException ie){
                fileName = null;
            }

            if(fileName == null || fileName.equals("")){
                continue outerLoop;
            }

            long startTime = System.currentTimeMillis();
            innerLoop: while(true){
                FileInputStream fis = null;
                File file = new File(fileName);
                try{
                    fis = new FileInputStream(fileName);
                    break innerLoop;
                }catch(FileNotFoundException fnfe){
                    if(!file.exists() || file.isDirectory()){
                        System.out.println("File: '"+fileName+"has been deleted in file system or it is not file. Not processing this file.");
                        continue outerLoop;
                    }
                    try{
                        Thread.sleep(WatchDirParameters.millisToPuaseForFileLock);
                    }catch(InterruptedException ie){
                    }
                    if((System.currentTimeMillis() - startTime) > WatchDirParameters.millisToSwapFileForUnlocking){
                        if(fileProcessingQueue.offer(fileName)){
                            continue outerLoop;
                        }else{
                            startTime = System.currentTimeMillis();
                            continue innerLoop;
                        }
                    }
                }finally{
                    if(fis != null){
                        try{
                            fis.close();
                        }catch(IOException ioe){
                            ioe.printStackTrace();
                        }
                    }
                }
            }

            System.out.println("Queuing File: "+fileName);
            processedLoop:while(true){
                try{
                    if(processedFileQueue.offer(fileName, 1000, TimeUnit.MILLISECONDS)){
                        break processedLoop;
                    }
                }catch(InterruptedException ie){
                    //ie.printStackTrace();
                }
            }
        }
        closeWatcherThread = true;
        closeProcessingThread = true;
        System.out.println("DirWatchProcessingThread Exited");
    }

    /**
     * Process all events for keys queued to the watcher
     */
    private void processEvents(){
        System.out.println("DirWatcherThread started.");
        while(!closeWatcherThread) {
            // wait for key to be signalled
            WatchKey key;
            try {
                key = watcher.take();
            } catch (InterruptedException x) {
                // if we are returning from these method, it means we no longer wants to watch directory
                // we must close thread which may be waiting for file names in queue
                continue;
            }catch(ClosedWatchServiceException cwse){
                break;
            }

            Path dir = keys.get(key);
            if (dir == null) {
                System.err.println("WatchKey not recognized!!");
                continue;
            }

            try{
                for (WatchEvent<?> event: key.pollEvents()) {
                    WatchEvent.Kind kind = event.kind();

                    if (kind == OVERFLOW) {
                        continue;
                    }

                    // Context for directory entry event is the file name of entry
                    WatchEvent<Path> ev = cast(event);
                    Path name = ev.context();
                    Path child = dir.resolve(name);
                    if(kind.equals(ENTRY_CREATE)){
                        // if directory is created, and watching recursively, then
                        // register it and its sub-directories
                        if (recursive) {
                            try {
                                if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
                                    registerAll(child);
                                    continue;
                                }
                            } catch (IOException x) {
                                // ignore to keep sample readbale
                            }
                        }
                        while(true){
                            if(fileProcessingQueue.remainingCapacity() < 2){
                                // if only one last can be inserted then don't queue this we need 1 empty space in queue
                                // for swaping file names..
                                // sleep for some time so processing thread may have made some rooms to queue in fileQueue
                                // this logic will not create any problems as only one this thread is inserting in queue
                                try{
                                    Thread.sleep(200);
                                }catch(InterruptedException ie){
                                }
                                continue;
                            }
                            if(!fileProcessingQueue.offer(child.toString())){
                                // couldn't queue this element by whatever reason.. we will try to enqueue again by continuing loop
                                continue;
                            }else{
                                // file name has been queued in queue
                                break;
                            }
                        }
                    }
                }
                // reset key and remove from set if directory no longer accessible
                boolean valid = key.reset();
                if (!valid) {
                    keys.remove(key);

                    // all directories are inaccessible
                    if (keys.isEmpty()) {
                        break;
                    }
                }
            }catch(ClosedWatchServiceException cwse){
                break;
            }

        }
        closeProcessingThread = true;
        closeWatcherThread = true;
        System.out.println("DirWatcherThread exited.");
    }

    public void stopWatching(){
        try{
            watcher.close();
        }catch(IOException ioe){
        }
        closeProcessingThread = true;
        closeWatcherThread = true;
    }

    public static WatchDir watchDirectory(String dirName, boolean recursive) throws InvalidPathException, IOException, Exception{
        try{
            Path dir = Paths.get(dirName);
            final WatchDir watchDir = new WatchDir(dir, recursive);
            watchDir.closeProcessingThread = false;
            watchDir.closeWatcherThread = false;
            new Thread(new Runnable() {
                public void run() {
                    watchDir.processFiles();
                }
            }, "DirWatchProcessingThread").start();
            new Thread(new Runnable() {
                public void run() {
                    watchDir.processEvents();
                }
            }, "DirWatcherThread").start();
            return watchDir;
        }catch(InvalidPathException ipe){
            throw ipe;
        }catch(IOException ioe){
            throw ioe;
        }catch(Exception e){
            throw e;
        }
    }

    @SuppressWarnings("unchecked")
    private static <T> WatchEvent<T> cast(WatchEvent<?> event) {
        return (WatchEvent<T>)event;
    }

    /**
     * Register the given directory with the WatchService
     */
    private void register(Path dir) throws IOException {
        //WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        WatchKey key = dir.register(watcher, ENTRY_CREATE);
        if (trace) {
            Path prev = keys.get(key);
            if (prev == null) {
                System.out.format("register: %s\n", dir);
            } else {
                if (!dir.equals(prev)) {
                    System.out.format("update: %s -> %s\n", prev, dir);
                }
            }
        }
        keys.put(key, dir);
    }

    /**
     * Register the given directory, and all its sub-directories, with the
     * WatchService.
     */
    private void registerAll(final Path start) throws IOException {
        // register directory and sub-directories
        Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                register(dir);
                return FileVisitResult.CONTINUE;
            }
        });
    }

    /**
     * Creates a WatchService and registers the given directory
     */
    private WatchDir(Path dir, boolean recursive) throws IOException {
        fileProcessingQueue = new ArrayBlockingQueue<String>(WatchDirParameters.fileQueueSize, false);
        processedFileQueue = new ArrayBlockingQueue<String>(WatchDirParameters.fileQueueSize, false);
        this.watcher = FileSystems.getDefault().newWatchService();
        this.keys = new HashMap<WatchKey,Path>();
        this.recursive = recursive;
        //CreateTxtFile.createFile(dir, 1);
        if (recursive) {
            System.out.format("Scanning %s ...\n", dir);
            registerAll(dir);
            System.out.println("Done.");
        } else {
            register(dir);
        }

        // enable trace after initial registration
        this.trace = true;
    }
}

参数类:

package dirwatch;

public class WatchDirParameters {
    public static final int millisToPuaseForFileLock = 200;
    public static final int fileQueueSize = 500;
    public static final int millisToSwapFileForUnlocking = 2000;
}
于 2013-01-03T18:18:59.557 回答
1

制作了@UDPLover 提供的文件的更新版本,该版本专为在高速率文件访问环境中使用而 HashMap<String, WatchEvent>构建本身。还制作了一个 print() 方法,允许启用或禁用由 WatchCore 打印到控制台的任何内容。原始示例中的文件轮询 for 循环已更新为使用 JDK8 函数 for 循环,使所有部分线程化/中断。这还没有经过测试,当我可以测试它时会更新修复。

package filewatcher;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author Nackloose
 * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
 */
public abstract class WatchCore extends Thread {

    //make class a thread by default
    /**
     * After the WatchCore recieves an event for a file and deems it unlocked,
     * it will be passed to this function
     *
     * @param e WatchEvent for the file, after it has been affirmed to be
     * unlocked.
     */
    public abstract void onEventAndUnlocked(WatchEvent e);
    private final WatchService watcher;
    private final Map<WatchKey, Path> keys;
    private final boolean recursive;
    private boolean trace = false;
    //converted to HashMap to remove the limitation as I need this in a high rate of file access enviroment.
    //as well as to carry the event passed for that folder into the block check itself.
    //got rid of the finished queue and made events pass to the abstract void above 
    private final HashMap<String, WatchEvent> fileProcessingQueue;
    //create a varible to keep track of the thread checking the file blocking, so we can start and stop it.
    private final WatchBlocker blocker;

    public WatchCore(String dir) throws IOException {
        //defaultly  dont recurse
        this(dir, false);
    }

    public WatchCore(String dir, boolean recursive) throws IOException {
        this(Paths.get(dir), recursive);

    }

    public WatchCore(Path dir) throws IOException {
        //defaultly  dont recurse
        this(dir, false);
    }

    public WatchCore(Path dir, boolean recursive) throws IOException {
        fileProcessingQueue = new HashMap<>();
        this.watcher = FileSystems.getDefault().newWatchService();
        this.keys = new HashMap<>();
        this.recursive = recursive;
        //CreateTxtFile.createFile(dir, 1);
        if (recursive) {
            print("Scanning %s ...", dir);
            registerAll(dir);
            print("Done.");
        } else {
            register(dir);
        }
        // enable trace after initial registration
        this.trace = true;
        //start the thread to process files to be checked for file blocking
        blocker = new WatchBlocker();
    }

    @SuppressWarnings("unchecked")
    private static <T> WatchEvent<T> cast(WatchEvent<?> event) {
        return (WatchEvent<T>) event;
    }

    @Override
    public synchronized void start() {
        //start up our thread _FIRST_
        super.start();
        //then start the blocking thread
        blocker.start();
    }

    @Override
    public void interrupt() {
        //Everything backwards, stop the blocker _FIRST_
        blocker.interrupt();
        //then stop our thread.
        super.interrupt();
    }

    /**
     * Register the given directory with the WatchService
     */
    private void register(Path dir) throws IOException {
        //WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        WatchKey key = dir.register(watcher, ENTRY_CREATE);
        if (trace) {
            Path prev = keys.get(key);
            if (prev == null) {
                print("register: %s\n", dir);
            } else {
                if (!dir.equals(prev)) {
                    print("update: %s -> %s\n", prev, dir);
                }
            }
        }
        keys.put(key, dir);
    }

    /**
     * Register the given directory, and all its sub-directories, with the
     * WatchService.
     */
    private void registerAll(final Path start) throws IOException {
        // register directory and sub-directories
        Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                register(dir);
                return FileVisitResult.CONTINUE;
            }
        });
    }

    /**
     * Process all events for keys queued to the watcher
     */
    @Override
    public void run() {
        //this was previous called processEvents()
        //pruned any un-nessicary continues, labels, and labels on breaks, a lot of them
        //were redundant
        print("DirWatcherThread started.");
        //as long as we're not interrupted we keep working
        while (!interrupted()) {
            // wait for key to be signalled
            WatchKey key;
            try {
                key = watcher.take();
            } catch (InterruptedException x) {
                // if we are returning from these method, it means we no longer wants to watch directory
                // we must close thread which may be waiting for file names in queue
                continue;
            } catch (ClosedWatchServiceException cwse) {
                break;
            }

            Path dir = keys.get(key);
            if (dir == null) {
                printe("WatchKey not recognized!!");
                continue;
            }

            try {
                //converted to functional for loop.
                key.pollEvents().stream().filter((event) -> {
                    WatchEvent.Kind kind = event.kind();
                    return !(kind == OVERFLOW); //make sure we do the filter
                }).forEach((event) -> {
                    WatchEvent.Kind kind = event.kind();
                    // Context for directory entry event is the file name of entry
                    WatchEvent<Path> ev = cast(event);
                    Path name = ev.context();
                    Path child = dir.resolve(name);
                    if (kind.equals(ENTRY_CREATE)) {
                        // if directory is created, and watching recursively, then
                        // register it and its sub-directories
                        if (recursive) {
                            try {
                                if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
                                    registerAll(child);
                                    return; //continue;
                                }
                            } catch (IOException x) {
                                // ignore to keep sample readbale
                            }
                        }
                        fileProcessingQueue.put(child.toString(), ev);
                    }
                });
                // reset key and remove from set if directory no longer accessible
                boolean valid = key.reset();
                if (!valid) {
                    keys.remove(key);

                    // all directories are inaccessible
                    if (keys.isEmpty()) {
                        break;
                    }
                }
            } catch (ClosedWatchServiceException cwse) {
                break;
            }

        }
        print("DirWatcherThread exited.");
    }

    /**
     *
     * @author
     * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
     * Nackloose
     */
    private class WatchBlocker extends Thread {

        @Override
        public synchronized void start() {
            //get it going
            super.start();
        }

        @Override
        public void interrupt() {
            //interupt our thread
            super.interrupt();
        }

        @Override
        public void run() {
            //this was perviously processFiles()
            //pruned any un-nessicary continues, labels, and labels on breaks, a lot of them
            //were redundant
            print("DirWatchProcessingThread Started");
            Entry<String, WatchEvent> fileEvent;
            outerLoop:
            //as long as we're not interrupted we keep working
            while (!interrupted()) {
                if (fileProcessingQueue.isEmpty()) {
                    try {
                        Thread.sleep(WatchCoreParameters.timeToIdle);
                    } catch (InterruptedException ex) {
                        Logger.getLogger(WatchCore.class.getName()).log(Level.SEVERE, null, ex);
                    }
                    continue;
                }
                fileEvent = fileProcessingQueue.entrySet().iterator().next();
                fileProcessingQueue.remove(fileEvent.getKey());

                long startTime = System.currentTimeMillis();
                while (true) {
                    FileInputStream fis = null;
                    File file = new File(fileEvent.getKey());
                    try {
                        fis = new FileInputStream(fileEvent.getKey());
                        break;
                    } catch (FileNotFoundException fnfe) {
                        if (!file.exists() || file.isDirectory()) {
                            print("File: '" + fileEvent + "has been deleted in file system or it is not file. Not processing this file.");
                            continue outerLoop;
                        }
                        try {
                            Thread.sleep(WatchCoreParameters.millisToPauseForFileLock);
                        } catch (InterruptedException ie) {
                        }
                        if ((System.currentTimeMillis() - startTime) > WatchCoreParameters.millisToSwapFileForUnlocking) {
                            fileProcessingQueue.put(fileEvent.getKey(), fileEvent.getValue());
                        }
                    } finally {
                        if (fis != null) {
                            try {
                                fis.close();
                            } catch (IOException ioe) {
                                ioe.printStackTrace();
                            }
                        }
                    }
                }
                print("Queuing File: " + fileEvent);
                //pass the unlocked file event to the abstract method
                onEventAndUnlocked(fileEvent.getValue());
            }
            print("DirWatchProcessingThread Exited");
        }
    }

    /**
     *
     * @author
     * http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
     * Nackloose
     */
    public static class WatchCoreParameters {

        public static int timeToIdle = 2000, millisToPauseForFileLock = 200,
                millisToSwapFileForUnlocking = 2000;
        public static boolean verbose = false;

    }

    //<editor-fold defaultstate="collapsed" desc="Printing methods">
    private void print(String s) {
        //defaultly we're not writing an error
        print(s, false);
    }

    public static final void print(String s, boolean error) {
        //check verbosity, exit if none.
        if (!WatchCoreParameters.verbose) {
            return;
        }
        //if this is an error, assign System.err to a temp varible
        //otherise assign System.out for normal printing
        PrintStream out = (!error ? System.out : System.err);
        if (s.contains("\n")) { // check to see if theirs a new line
            out.print(s); //print accordingly
        } else {
            out.println(s); //print accordingly
        }
    }

    public static final void printe(String s) {
        //shortcut/convenience method for printing an error
        print(s, true);
    }

    public static final void print(String s, Object... formatObj) {
        //check verbosity, exit if none.
        if (!WatchCoreParameters.verbose) {
            return;
        }
        //format the object into the string, and if no newline is there, add it.
        System.out.format(s + (s.contains("\n") ? "" : "\n"), formatObj);
    }
//</editor-fold>
}
于 2016-02-09T00:56:41.383 回答