31

我一直在编写一个监视目录的程序,当在其中创建文件时,它会更改名称并将它们移动到新目录。在我的第一个实现中,我使用了 Java 的 Watch Service API,它在我测试 1kb 文件时运行良好。出现的问题是,实际上创建的文件大小在 50-300mb 之间。发生这种情况时,watcher API 会立即找到该文件,但无法移动它,因为它仍在写入中。我尝试将观察程序置于一个循环中(在文件可以移动之前会产生异常),但这似乎效率很低。

由于这不起作用,我尝试使用一个计时器,该计时器每 10 秒检查一次文件夹,然后在可能的情况下移动文件。这是我最终采用的方法。

问题:在没有进行异常检查或不断比较大小的情况下,是否会在文件写入完成时发出信号?我喜欢对每个文件只使用一次 Watcher API 的想法,而不是不断地使用计时器检查(并遇到异常)。

非常感谢所有回复!

nt

4

13 回答 13

22

我今天遇到了同样的问题。我的用例在实际导入文件之前有一点延迟并不是什么大问题,我仍然想使用 NIO2 API。我选择的解决方案是等到文件没有被修改 10 秒后再对其执行任何操作。

实现的重要部分如下。程序一直等待,直到等待时间到期或发生新事件。每次修改文件时都会重置过期时间。如果一个文件在等待时间到期之前被删除,它将从列表中删除。我使用 poll 方法有一个预期过期时间的超时,即 (lastmodified+waitTime)-currentTime

private final Map<Path, Long> expirationTimes = newHashMap();
private Long newFileWait = 10000L;

public void run() {
    for(;;) {
        //Retrieves and removes next watch key, waiting if none are present.
        WatchKey k = watchService.take();

        for(;;) {
            long currentTime = new DateTime().getMillis();

            if(k!=null)
                handleWatchEvents(k);

            handleExpiredWaitTimes(currentTime);

            // If there are no files left stop polling and block on .take()
            if(expirationTimes.isEmpty())
                break;

            long minExpiration = min(expirationTimes.values());
            long timeout = minExpiration-currentTime;
            logger.debug("timeout: "+timeout);
            k = watchService.poll(timeout, TimeUnit.MILLISECONDS);
        }
    }
}

private void handleExpiredWaitTimes(Long currentTime) {
    // Start import for files for which the expirationtime has passed
    for(Entry<Path, Long> entry : expirationTimes.entrySet()) {
        if(entry.getValue()<=currentTime) {
            logger.debug("expired "+entry);
            // do something with the file
            expirationTimes.remove(entry.getKey());
        }
    }
}

private void handleWatchEvents(WatchKey k) {
    List<WatchEvent<?>> events = k.pollEvents();
    for (WatchEvent<?> event : events) {
        handleWatchEvent(event, keys.get(k));
    }
    // reset watch key to allow the key to be reported again by the watch service
    k.reset();
}

private void handleWatchEvent(WatchEvent<?> event, Path dir) throws IOException {
    Kind<?> kind = event.kind();

    WatchEvent<Path> ev = cast(event);
        Path name = ev.context();
        Path child = dir.resolve(name);

    if (kind == ENTRY_MODIFY || kind == ENTRY_CREATE) {
        // Update modified time
        FileTime lastModified = Attributes.readBasicFileAttributes(child, NOFOLLOW_LINKS).lastModifiedTime();
        expirationTimes.put(name, lastModified.toMillis()+newFileWait);
    }

    if (kind == ENTRY_DELETE) {
        expirationTimes.remove(child);
    }
}
于 2011-01-24T15:03:18.660 回答
11

写入另一个文件作为原始文件已完成的指示。如果完成创建文件“fileorg.done”并仅检查“fileorg.done”,则 Ig“fileorg.dat”正在增长。

使用巧妙的命名约定,您应该不会有问题。

于 2010-07-30T07:24:17.073 回答
9

两种解决方案:

第一个是stacker 答案的轻微变化:

对不完整的文件使用唯一的前缀。类似的东西,myhugefile.zip.inc而不是myhugefile.zip. 上传/创建完成后重命名文件。从手表中排除 .inc 文件。

第二种是使用同一驱动器上的不同文件夹来创建/上传/写入文件,并在准备好后将它们移动到监视文件夹。如果它们在同一个驱动器上(我猜是依赖于文件系统),移动应该是一个原子操作。

无论哪种方式,创建文件的客户端都必须做一些额外的工作。

于 2010-07-30T08:20:44.500 回答
5

看起来 Apache Camel 通过尝试重命名文件 (java.io.File.renameTo) 来处理文件未完成上传问题。如果重命名失败,没有读锁,但继续尝试。当重命名成功时,他们将其重命名,然后继续进行预期的处理。

请参阅下面的操作.renameFile。以下是 Apache Camel 源的链接: GenericFileRenameExclusiveReadLockStrategy.javaFileUtil.java

public boolean acquireExclusiveReadLock( ... ) throws Exception {
   LOG.trace("Waiting for exclusive read lock to file: {}", file);

   // the trick is to try to rename the file, if we can rename then we have exclusive read
   // since its a Generic file we cannot use java.nio to get a RW lock
   String newName = file.getFileName() + ".camelExclusiveReadLock";

   // make a copy as result and change its file name
   GenericFile<T> newFile = file.copyFrom(file);
   newFile.changeFileName(newName);
   StopWatch watch = new StopWatch();

   boolean exclusive = false;
   while (!exclusive) {
        // timeout check
        if (timeout > 0) {
            long delta = watch.taken();
            if (delta > timeout) {
                CamelLogger.log(LOG, readLockLoggingLevel,
                        "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
                // we could not get the lock within the timeout period, so return false
                return false;
            }
        }

        exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath());
        if (exclusive) {
            LOG.trace("Acquired exclusive read lock to file: {}", file);
            // rename it back so we can read it
            operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath());
        } else {
            boolean interrupted = sleep();
            if (interrupted) {
                // we were interrupted while sleeping, we are likely being shutdown so return false
                return false;
            }
        }
   }

   return true;
}
于 2013-07-23T16:18:47.390 回答
4

我知道这是一个老问题,但也许它可以帮助某人。

我有同样的问题,所以我做了以下事情:

if (kind == ENTRY_CREATE) {
            System.out.println("Creating file: " + child);

            boolean isGrowing = false;
            Long initialWeight = new Long(0);
            Long finalWeight = new Long(0);

            do {
                initialWeight = child.toFile().length();
                Thread.sleep(1000);
                finalWeight = child.toFile().length();
                isGrowing = initialWeight < finalWeight;

            } while(isGrowing);

            System.out.println("Finished creating file!");

        }

当文件被创建时,它会变得越来越大。所以我所做的就是比较相隔一秒的重量。该应用程序将处于循环中,直到两个权重相同。

于 2013-03-08T16:09:19.097 回答
3

虽然当 SO 完成复制时不可能由 Watcher Service API 通知,但所有选项似乎都是“解决方法”(包括这个!)。

如上所述,

1) 在 UNIX 上不能选择移动或复制;

2) 如果你有写权限,File.canWrite 总是返回 true,即使文件还在被复制;

3) 等待超时或新事件发生是一种选择,但如果系统过载但复制未完成怎么办?如果超时值很大,程序会等待很长时间。

4)如果您只是在使用文件而不是创建文件,则无法选择将另一个文件写入“标记”复制完成的选项。

另一种方法是使用以下代码:

boolean locked = true;

while (locked) {
    RandomAccessFile raf = null;
    try {
            raf = new RandomAccessFile(file, "r"); // it will throw FileNotFoundException. It's not needed to use 'rw' because if the file is delete while copying, 'w' option will create an empty file.
            raf.seek(file.length()); // just to make sure everything was copied, goes to the last byte
            locked = false;
        } catch (IOException e) {
            locked = file.exists();
            if (locked) {
                System.out.println("File locked: '" + file.getAbsolutePath() + "'");
                Thread.sleep(1000); // waits some time
            } else { 
                System.out.println("File was deleted while copying: '" + file.getAbsolutePath() + "'");
            }
    } finally {
            if (raf!=null) {
                raf.close();    
            }
        }
}
于 2012-10-03T13:37:36.430 回答
0

这是一个非常有趣的讨论,因为这肯定是一个生死攸关的用例:等待一个新文件被创建,然后以某种方式对该文件作出反应。这里的竞争条件很有趣,因为这里的高级要求当然是获取一个事件,然后实际获得(至少)文件的读锁。对于大文件或只是创建大量文件,这可能需要整个工作线程池,它们只是定期尝试锁定新创建的文件,并且当它们成功时,实际完成工作。但我确信 NT 意识到,必须谨慎地进行此操作以使其可扩展,因为它最终是一种轮询方法,而可扩展性和轮询并不是两个可以很好结合的词。

于 2011-01-20T20:00:54.117 回答
0

当我实现一个文件系统观察器来传输上传的文件时,我不得不处理类似的情况。我为解决此问题而实施的解决方案包括以下内容:

1-首先,维护一个未处理文件的映射(只要文件还在被复制,文件系统就会生成Modify_Event,因此如果标志为false,您可以忽略它们)。

2-在您的文件处理器中,您从列表中提取一个文件并检查它是否被文件系统锁定,如果是,您将得到一个异常,只需捕获此异常并将您的线程置于等待状态(即 10 秒)然后重试再次直到锁被释放。处理文件后,您可以将标志更改为 true 或将其从地图中删除。

如果在等待时间段内传输同一文件的多个版本,则此解决方案效率不高。

干杯,拉姆齐

于 2011-12-12T15:00:01.680 回答
0

根据写入完成后您需要移动文件的紧急程度,您还可以检查稳定的最后修改时间戳,并且只移动它被静默的文件。您需要它稳定的时间量可能取决于实现,但我认为具有最后修改时间戳且 15 秒未更改的东西应该足够稳定以便移动。

于 2012-10-03T18:32:56.900 回答
0

对于 linux 中的大文件,文件以 .filepart 的扩展名复制。您只需要使用 commons api 检查扩展并注册 ENTRY_CREATE 事件。我用我的 .csv 文件(1GB)对此进行了测试并添加它有效

public void run()
{
    try
    {
        WatchKey key = myWatcher.take();
        while (key != null)
        {
            for (WatchEvent event : key.pollEvents())
            {
                if (FilenameUtils.isExtension(event.context().toString(), "filepart"))
                {
                    System.out.println("Inside the PartFile " + event.context().toString());
                } else
                {
                    System.out.println("Full file Copied " + event.context().toString());
                    //Do what ever you want to do with this files.
                }
            }
            key.reset();
            key = myWatcher.take();
        }
    } catch (InterruptedException e)
    {
        e.printStackTrace();
    }
}
于 2015-04-20T07:52:14.463 回答
0

如果您无法控制写入过程,请记录所有ENTRY_CREATED事件并观察是否存在模式

在我的例子中,这些文件是通过 WebDav (Apache) 创建的,并且创建了很多临时文件,但同一个文件也触发了两个 ENTRY_CREATED事件。第二个ENTRY_CREATED事件表示复制过程完成。

这是我的示例ENTRY_CREATED事件。打印绝对文件路径(您的日志可能会有所不同,具体取决于写入文件的应用程序):

[info] application - /var/www/webdav/.davfs.tmp39dee1 was created
[info] application - /var/www/webdav/document.docx was created
[info] application - /var/www/webdav/.davfs.tmp054fe9 was created
[info] application - /var/www/webdav/document.docx was created
[info] application - /var/www/webdav/.DAV/__db.document.docx was created 

如您所见,我收到了document.docxENTRY_CREATED的两个事件。在第二个事件之后,我知道文件已完成。在我的情况下,临时文件显然被忽略了。

于 2017-07-25T12:48:25.847 回答
0

所以,我遇到了同样的问题,并为我提供了以下解决方案。早期不成功的尝试 - 尝试监视每个文件的“lastModifiedTime”统计信息,但我注意到大文件的大小增长可能会暂停一段时间。(大小不会连续变化)

基本理念- 对于每个事件,创建一个触发器文件(在临时目录中),其名称格式如下 -

OriginalFileName_lastModifiedTime_numberOfTries

这个文件是空的,所有的戏都只在名字里。原始文件只会在经过特定持续时间的间隔后才会考虑,而不会更改其“最后修改时间”统计信息。(注意 - 因为它是一个文件统计,所以没有开销 -> O(1))

注意- 此触发器文件由不同的服务(例如“ FileTrigger ”)处理。

优势-

  1. 没有睡眠或等待保持系统。
  2. 减轻文件观察者对其他事件的监视

FileWatcher 的代码 -

val triggerFileName: String = triggerFileTempDir + orifinalFileName + "_" + Files.getLastModifiedTime(Paths.get(event.getFile.getName.getPath)).toMillis + "_0"

// creates trigger file in temporary directory
val triggerFile: File = new File(triggerFileName)
val isCreated: Boolean = triggerFile.createNewFile()

if (isCreated)
    println("Trigger created: " + triggerFileName)
else
    println("Error in creating trigger file: " + triggerFileName)

FileTrigger 的代码(间隔时间为 5 分钟的 cron 作业) -

 val actualPath : String = "Original file directory here"
 val tempPath : String = "Trigger file directory here"
 val folder : File = new File(tempPath)    
 val listOfFiles = folder.listFiles()

for (i <- listOfFiles)
{

    // ActualFileName_LastModifiedTime_NumberOfTries
    val triggerFileName: String = i.getName
    val triggerFilePath: String = i.toString

    // extracting file info from trigger file name
    val fileInfo: Array[String] = triggerFileName.split("_", 3)
    // 0 -> Original file name, 1 -> last modified time, 2 -> number of tries

    val actualFileName: String = fileInfo(0)
    val actualFilePath: String = actualPath + actualFileName
    val modifiedTime: Long = fileInfo(1).toLong
    val numberOfTries: Int = fileStats(2).toInt

    val currentModifiedTime: Long = Files.getLastModifiedTime(Paths.get(actualFilePath)).toMillis
    val differenceInModifiedTimes: Long = currentModifiedTime - modifiedTime
    // checks if file has been copied completely(4 intervals of 5 mins each with no modification)
    if (differenceInModifiedTimes == 0 && numberOfTries == 3)
    {
        FileUtils.deleteQuietly(new File(triggerFilePath))
        println("Trigger file deleted. Original file completed : " + actualFilePath)
    }
    else
    {
        var newTriggerFileName: String = null
        if (differenceInModifiedTimes == 0)
        {
            // updates numberOfTries by 1
            newTriggerFileName = actualFileName + "_" + modifiedTime + "_" + (numberOfTries + 1)
        }
        else
        {
            // updates modified timestamp and resets numberOfTries to 0
            newTriggerFileName = actualFileName + "_" + currentModifiedTime + "_" + 0
        }

        // renames trigger file
        new File(triggerFilePath).renameTo(new File(tempPath + newTriggerFileName))
        println("Trigger file renamed: " + triggerFileName + " -> " + newTriggerFileName)
    }    
}
于 2018-05-03T14:59:59.373 回答
-1

我推测 java.io.File.canWrite() 会告诉你文件何时完成写入。

于 2010-07-30T08:42:35.130 回答