3

我对 FileSystemWatcher 有疑问。我在 Windows 服务中使用它来监视某些文件夹,当复制文件时,它使用 SSIS 包处理该文件。一切正常,但 FileWatcher 时不时地拾取同一个文件并在无限循环中多次触发 Created 事件。下面的代码工作如下:

首先,此方法由 Windows 服务调用并创建一个观察者:

    private void CreateFileWatcherEvent(SSISPackageSetting packageSettings)
    {   
      // Create a new FileSystemWatcher and set its properties.
      FileSystemWatcher watcher = new FileSystemWatcher();

      watcher.IncludeSubdirectories = false;

      watcher.Path = packageSettings.FileWatchPath;

      /* Watch for changes in LastAccess and LastWrite times, and
        the renaming of files or directories. */

      watcher.NotifyFilter = NotifyFilters.LastAccess | NotifyFilters.LastWrite
        | NotifyFilters.FileName | NotifyFilters.DirectoryName | NotifyFilters.Size;

      //Watch for all files
      watcher.Filter = "*.*";

      watcher.Created += (s, e) => FileCreated(e, packageSettings);

      // Begin watching.
      watcher.EnableRaisingEvents = true;
    }

接下来,Watcher.Created 事件看起来像这样:

    private void FileCreated(FileSystemEventArgs e, SSISPackageSetting packageSettings)
    {
        //Bunch of other code not important to the issue

        ProcessFile(packageSettings, e.FullPath, fileExtension);
    }

ProcessFile 方法如下所示:

    private void ProcessFile(SSISPackageSetting packageSetting,string Filename,string fileExtension)
    {
        //COMPLETE A BUNCH OF SSIS TASKS TO PROCESS THE FILE


        //NOW WE NEED TO CREATE THE OUTPUT FILE SO THAT SSIS CAN WRITE TO IT      
        string errorOutPutfileName = packageSetting.ImportFailurePath + @"\FailedRows" + System.DateTime.Now.ToFileTime() + packageSetting.ErrorRowsFileExtension;
        File.Create(errorOutPutfileName).Close();

        MoveFileToSuccessPath(Filename, packageSetting);              
    }

最后,MoveFile 方法如下所示:

    private void MoveFileToSuccessPath(string filename, SSISPackageSetting ssisPackage)
    {
        try 
        {
            string newFilename = MakeFilenameUnique(filename);
            System.IO.File.Move(filename, ssisPackage.ArchivePath.EndsWith("\\") ? ssisPackage.ArchivePath + newFilename : ssisPackage.ArchivePath + "\\" + newFilename);

        }
        catch (Exception ex)
        {
            SaveToApplicationLog(string.Format
                ("Error ocurred while moving a file to the success path. Filename {0}. Archive Path {1}. Error {2}", filename, ssisPackage.ArchivePath,ex.ToString()), EventLogEntryType.Error);
        }            
    }

所以在那里的某个地方,我们进入了一个无限循环,并且 FileWatcher 继续拾取同一个文件。有人知道吗?这会随机且间歇性地发生。

4

1 回答 1

2

使用 FileSystemWatcher 时,我倾向于使用字典将文件添加到通知事件触发时。然后我有一个单独的线程,它使用一个计时器从这个集合中挑选文件,当它们超过几秒时,大约 5 秒。

如果我的处理也可能改变最后一次访问时间并且我也观察到了,那么我还实现了一个校验和,我将它与文件名和每个文件的最后处理时间一起保存在字典中,并使用它来抑制它在中多次触发一排。您不必使用昂贵的计算,我已经使用 md5 甚至 crc32 - 您只是试图防止多个通知。

编辑

这个示例代码是非常具体的情况,并做出了许多您可能需要更改的假设。它没有列出您的所有代码,只是像您需要添加的位一样:

// So, first thing to do is add a dictionary to store file info:

internal class FileWatchInfo
{
    public DateTime LatestTime { get; set; }
    public bool IsProcessing { get; set; }
    public string FullName { get; set; }
    public string Checksum { get; set; }
}
SortedDictionary<string, FileWatchInfo> fileInfos = new SortedDictionary<string, FileWatchInfo>();
private readonly object SyncRoot = new object();

//  Now, when you set up the watcher, also set up a [`Timer`][1] to monitor that dictionary.

CreateFileWatcherEvent(new SSISPackageSetting{ FileWatchPath = "H:\\test"});
int processFilesInMilliseconds = 5000;
Timer timer = new Timer(ProcessFiles, null, processFilesInMilliseconds, processFilesInMilliseconds);

// In FileCreated, don't process the file but add it to a list

private void FileCreated(FileSystemEventArgs e) {
    var finf = new FileInfo(e.FullPath);
    DateTime latest = finf.LastAccessTimeUtc > finf.LastWriteTimeUtc
        ? finf.LastAccessTimeUtc : finf.LastWriteTimeUtc;
    latest = latest > finf.CreationTimeUtc ? latest : finf.CreationTimeUtc;
    //  Beware of issues if other code sets the file times to crazy times in the past/future

    lock (SyncRoot) {
        //  You need to work out what to do if you actually need to add this file again (i.e. someone
        //  has edited it in the 5 seconds since it was created, and the time it took you to process it)
        if (!this.fileInfos.ContainsKey(e.FullPath)) {
            FileWatchInfo info = new FileWatchInfo {
                FullName = e.FullPath,
                LatestTime = latest,
                IsProcessing = false, Processed = false,
                Checksum = null
            };
            this.fileInfos.Add(e.FullPath, info);
        }
    }
}

最后,这是现在的处理方法

    private void ProcessFiles(object state) {
        FileWatchInfo toProcess = null;
        List<string> toRemove = new List<string>();
        lock (this.SyncRoot) {
            foreach (var info in this.fileInfos) {
                //  You may want to sort your list by latest to avoid files being left in the queue for a long time
                if (info.Value.Checksum == null) {
                    //  If this fires the watcher, it doesn't matter, but beware of big files,
                    //     which may mean you need to move this outside the lock
                    string md5Value;
                    using (var md5 = MD5.Create()) {
                        using (var stream = File.OpenRead(info.Value.FullName)) {
                            info.Value.Checksum =
                                BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLower();
                        }
                    }
                }
                //  Data store (myFileInfoStore) is code I haven't included - use a Dictionary which you remove files from
                //   after a few minutes, or a permanent database to store file checksums
                if ((info.Value.Processed && info.Value.ProcessedTime.AddSeconds(5) < DateTime.UtcNow) 
                   || myFileInfoStore.GetFileInfo(info.Value.FullName).Checksum == info.Value.Checksum) {
                    toRemove.Add(info.Key);
                }
                else if (!info.Value.Processed && !info.Value.IsProcessing 
                    && info.Value.LatestTime.AddSeconds(5) < DateTime.UtcNow) {
                    info.Value.IsProcessing = true;
                    toProcess = info.Value;
                    //  This processes one file at a time, you could equally add a bunch to a list for parallel processing
                    break;
                }
            }
            foreach (var filePath in toRemove) {
                this.fileInfos.Remove(filePath);
            }
        }
        if (toProcess != null)
        {
            ProcessFile(packageSettings, toProcess.FullName, new FileInfo(toProcess.FullName).Extension); 
        }
    }

最后,ProcessFile需要处理您的文件,完成后进入锁,将fileInfos字典中的信息标记为Processed,设置ProcessedTime,然后退出锁并移动文件。如果校验和在经过可接受的时间后发生变化,您还需要更新校验和。

由于我对您的情况一无所知,因此很难提供完整的示例,但这是我使用的一般模式。您将需要考虑文件速率、更新频率等。您可以将时间间隔降低到亚秒而不是 5 秒,但仍然可以。

于 2013-05-28T06:21:53.767 回答