0

我有一个基于 BlockingCollection 的带有生产者消费者模式的简单记录器(代码如下)。

public class Logger
{
    public Logger()
    {
        _messages = new BlockingCollection<LogMessage>(int.MaxValue);
        _worker = new Thread(Work) {IsBackground = true};
        _worker.Start();
    }

    ~Logger()
    {   
        _messages.CompleteAdding();
        _worker.Join();                 // Wait for the consumer's thread to finish.
        //Some logic on closing log file
    }

    /// <summary>
    /// This is message consumer thread
    /// </summary>
    private void Work()
    {
        while (!_messages.IsCompleted)
        {
            //Try to get data from queue
            LogMessage message;
            try
            {
                message = _messages.Take();
            }
            catch (ObjectDisposedException) { break; }    //The BlockingCollection(Of T) has been disposed.
            catch(InvalidOperationException){ continue; } //the BlockingCollection(Of T) is empty and the collection has been marked as complete for adding.

            //... some simple logic to write 'message'
        }
    }
}

问题是应用程序并没有立即结束。结束一个应用程序需要 20-40 秒,如果我在中间使用调试器暂停它,我会看到:
1. GC.Finalize 线程设置在 _worker.Join();
2. _worker 线程在 _messages.Take() 上。

我会等待 _messages.Take() 在 _messages.CompleteAdding(); 之后结束。但看起来不是。

这种终结有什么问题,在这种情况下如何更好地终结工作线程?

PS 我可以简单地删除 _worker.Join() 但随后 Work() 可以向关闭的文件写入一些内容。我的意思是,这是并发的不确定情况。

更新
作为概念证明,我已将 ~Logger() 重命名为 Close() 并在某个时候调用它。它立即关闭记录器。所以 _messages.Take() 在 _messages.CompleteAdding() 之后结束,正如在这种情况下所预期的那样。

我在 GC 线程的高优先级中看到的 ~Logger 中 20-40 秒延迟的唯一解释。能不能有别的解释?

4

1 回答 1

3

在 C# 中,终结器(又名析构函数)是不确定的,这意味着您无法预测何时调用它们或以何种顺序调用它们。例如,在您的代码中,_worker 的终结器完全有可能在Logger的终结器之后。出于这个原因,您永远不应该在终结器中访问托管对象(例如 FileStreams 等),因为其他托管资源的终结器可能已经完成,从而使它们的引用无效。此外,直到 GC 确定需要收集(由于需要额外的内存)之后,才会调用终结器。在您的情况下,GC 可能需要 20-40 秒才能进行所需的收集。

您想要做的是摆脱终结器并改用 IDisposable 接口(可选地使用可能提供更好可读性的 Close() 方法)。

然后,您只需logger.Close()在不再需要时调用。

void IDisposable.Dispose()
{   
     Close();
}

void Close() 
{
    _messages.CompleteAdding();
    _worker.Join(); // Wait for the consumer's thread to finish.
    //Some logic on closing log file
}

通常,只有在需要清理非托管资源时才使用终结器(例如,如果您正在使用 P/Invoke WinAPI 函数调用等)。如果您只使用 .Net 类等,您可能没有任何理由使用一个。IDisposable 几乎总是更好的选择,因为它提供了确定性清理。

有关终结器与析构函数的更多信息,请查看此处: 在 C# 中使用 IDisposable 与析构函数有什么区别?

我将对您的代码进行的另一项更改是使用 TryTake 而不是 Take。这消除了对 try/catch 的需要,因为当集合为空并调用 CompleteAdding 时它不会抛出异常。它只会返回false。

private void Work()
{
    //Try to get data from queue
    LogMessage message;
    while (_messages.TryTake(out message, Timeout.Infinite))
       //... some simple logic to write 'message'       
}

您在代码捕获的两个异常仍然可能由于其他原因发生,例如在处理它之后访问它或修改 BlockingCollection 的基础集合(有关更多信息,请参阅MSDN)。但是这些都不应该出现在您的代码中,因为您不持有对基础集合的引用,并且您不会在 Work 函数完成之前处理 BlockingCollection。如果您仍然想捕获这些异常,以防万一,您可以在 while 循环之外放置一个 try/catch 块(因为您不想在任何一个异常发生后继续循环)。

最后,为什么要指定 int.MaxValue 作为集合的容量?除非您希望定期向集合中添加接近那么多的消息,否则您不应该这样做。

所以总的来说,我会重写你的代码如下:

public class Logger : IDisposable
{
    private BlockingCollection<LogMessage> _messages = null;
    private Thread _worker = null;
    private bool _started = false;

    public void Start() 
    {
        if (_started) return;
        //Some logic to open log file
        OpenLogFile();      
        _messages = new BlockingCollection<LogMessage>();  //int.MaxValue is the default upper-bound
        _worker = new Thread(Work) { IsBackground = true };
        _worker.Start();
        _started = true;
    }

    public void Stop()
    {   
        if (!_started) return;

        // prohibit adding new messages to the queue, 
        // and cause TryTake to return false when the queue becomes empty.
        _messages.CompleteAdding();

        // Wait for the consumer's thread to finish.
        _worker.Join();  

        //Dispose managed resources
        _worker.Dispose();
        _messages.Dispose();

        //Some logic to close log file
        CloseLogFile(); 

        _started = false;
    }

    /// <summary>
    /// Implements IDiposable 
    /// In this case, it is simply an alias for Stop()
    /// </summary>
    void IDisposable.Dispose() 
    {
        Stop();
    }

    /// <summary>
    /// This is message consumer thread
    /// </summary>
    private void Work()
    {
        LogMessage message;
        //Try to get data from queue
        while(_messages.TryTake(out message, Timeout.Infinite))
            WriteLogMessage(message); //... some simple logic to write 'message'
    }
}

如您所见,我添加了启用/禁用队列处理的方法Start()Stop()如果需要,您可以从构造函数调用 Start(),但通常,您可能不希望在构造函数中进行昂贵的操作(例如线程创建)。我使用 Start/Stop 而不是 Open/Close,因为它似乎对记录器更有意义,但这只是个人喜好,任何一对都可以正常工作。正如我之前提到的,您甚至不必使用 Stop 或 Close 方法。只需添加 Dispose() 就足够了,但某些类(如Streams 等)使用 Close 或 Stop 作为 Dispose 的别名,只是为了使代码更具可读性。

于 2012-12-27T05:51:41.973 回答