2

我在创建一个功能正常的 SystemFileWatcher 时遇到了麻烦,它接收创建的事件并将其存储在队列中以供单独的线程工作。我在这里阅读了无数关于这个问题的帖子,但我无法理解这个特定的问题。

using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;

namespace FileSystemWatcherTest
{
    class Program
    {
        public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());

    static void Main(string[] args)
    {
        string path = @"C:\test\";
        FileSystemWatcher watcher = new FileSystemWatcher();

        watcher.Path = path;
        watcher.EnableRaisingEvents = true;
        watcher.Filter = "*.*";

        watcher.Created += new FileSystemEventHandler(onCreated);
        Thread Consumer = new Thread(new ThreadStart(mover));
        Consumer.Start();


        while (true) ;//run infinite loop so program doesn't terminate untill we force it.
    }
    static void onCreated(object sender, FileSystemEventArgs e)
    {
        processCollection.Add(e.FullPath);     
    }

    static void mover()
    {
        string current;
        string processed = @"C:\test\processed\";
        while (true)
        {
            while (processCollection.IsCompleted)
            {
                Thread.Sleep(1000);
            }
            while (processCollection.TryTake(out current))
            {
                System.IO.File.Move(current, processed);
            }
        }
    }
}

}

这是我想测试的。我知道这不起作用。当文件放入队列中时,我只是简单地写入控制台,我已经验证了 FSW 的工作原理。当我尝试在它自己的线程中启动移动器功能时,我的问题就开始了。一旦我开始在队列外工作,移动函数和 onCreated 似乎就没有通信了。

我对这段代码的期望是在它自己的线程中启动移动器函数并与 SFW 一起运行它。我的期望是附加到blockingcollection自动更新的并发队列(我通过onCreated将一个项目排入队列,移动器看到它现在对该队列有+1。移动器从队列中取出一个,onCreated看到这个。)我是可能错误地使用了 Thread.Sleep。我不再有使用blockingcollection的支持理由(我最初选择它来处理等待队列填满,并且基本上,不断检查队列以查找要处理的项目)并且愿意将其更改为任何可能的工作。我已经看到了锁的使用,但据我了解,由于 concurrentQueue 是如何同步的,这并不是真正必要的。

最终目标是处理大量随机进入的小文件,在任何给定时间范围从 1 到数百不等。这些文件是 .EML。

如果可能的话,我将非常感谢您解释正在发生的事情以及解决此问题的建议。我谦虚地来,希望被告知我所理解的一切都是不正确的!

编辑:我将其作为控制台应用程序进行测试,但之后将用作服务。我添加了 while (true) ;在 onCreated() 之前保持 FSW 运行。

4

1 回答 1

2

您的代码示例中有几个不同的问题:

  1. 您正在滥用该File.Move()方法。它要求两个参数都是完整的文件。您将目录名称作为第二个参数传递,这是不正确的。
  2. 您正在检查IsCompleted集合的属性,好像这很有用。它将永远是false,因此该代码块什么也不做。这导致了下一个问题……
  3. 你的线程在一个紧密的循环中运行,消耗大量的 CPU 时间。这可能会或可能不会导致错误,但它可能……FileSystemWatcher实际上并不能保证总是报告更改,并且可能无法保证的原因之一是它无法获得足够的 CPU 时间来监视文件系统。如果你用完所有的 CPU 时间而饿死它,你可能会发现它根本没有报告变化。请注意,您的主线程中也存在此问题;它还在一个紧密的循环中运行,无所事事地消耗大量的 CPU 时间。所以你完全占据了你系统的两个核心。
  4. 你没有利用为生产者/消费者BlockingCollection设计的执行模型。您应该让您的工作线程枚举由返回的枚举GetConsumingEnumerable(),使用该CompleteAdding()方法向该线程发出信号,表明不再有工作。

这是您的代码示例的一个版本,它更正了上述错误,并稍微清理了示例,使其更加独立:

// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();

static void Main(string[] args)
{
    string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");

    Console.WriteLine("Creating directory: \"{0}\"", testDirectory);
    Directory.CreateDirectory(testDirectory);

    FileSystemWatcher watcher = new FileSystemWatcher();

    watcher.Path = testDirectory;
    watcher.EnableRaisingEvents = true;
    watcher.Filter = "*.*";

    watcher.Created += new FileSystemEventHandler(onCreated);
    Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
    Consumer.Start(testDirectory);

    string text;

    while ((text = Console.ReadLine()) != "")
    {
        string newFile = Path.Combine(testDirectory, text + ".txt");

        File.WriteAllText(newFile, "Test file");
    }

    processCollection.CompleteAdding();
}

static void onCreated(object sender, FileSystemEventArgs e)
{
    if (e.ChangeType == WatcherChangeTypes.Created)
    {
        processCollection.Add(e.FullPath);
    }
}

static void mover(object testDirectory)
{
    string processed = Path.Combine((string)testDirectory, "processed");

    Console.WriteLine("Creating directory: \"{0}\"", processed);

    Directory.CreateDirectory(processed);

    foreach (string current in processCollection.GetConsumingEnumerable())
    {
        // Ensure that the file is in fact a file and not something else.
        if (File.Exists(current))
        {
            System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
        }
    }
}
于 2015-08-08T00:58:48.920 回答