1

我已经建立了一个生产者消费者队列,该队列包装了一个.net 4.0的ConcurrentQueue,并在生产(Enqueue)和消费(基于while(true)线程之间发出了SlimManualResetEvent信号。队列看起来像:

public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T>
{
    private bool _IsActive=true;

    public int Count
    {
        get
        {
            return this._workerQueue.Count;
        }
    }

    public bool IsActive
    {
        get { return _IsActive; }
        set { _IsActive = value; }
    }

    public event Dequeued<T> OnDequeued = delegate { };
    public event LoggedHandler OnLogged = delegate { };

    private ConcurrentQueue<T> _workerQueue = new ConcurrentQueue<T>();

    private object _locker = new object();

    Thread[] _workers;

    #region IDisposable Members

    int _workerCount=0;

    ManualResetEventSlim _mres = new ManualResetEventSlim();

    public void Dispose()
    {
        _IsActive = false;

        _mres.Set();

        LogWriter.Write("55555555555");

          for (int i = 0; i < _workerCount; i++)
          // Wait for the consumer's thread to finish.
          {
             _workers[i].Join();        
          }
           LogWriter.Write("6666666666");
     // Release any OS resources.
    }
    public ProducerConsumerQueue(int workerCount)
    {
        try
        {
            _workerCount = workerCount;
            _workers = new Thread[workerCount];
            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Work)).Start();
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
        }

    }
    #endregion

    #region IProducerConsumerQueue<T> Members

    public void EnqueueTask(T task)
    {
        if (_IsActive)
        {
            _workerQueue.Enqueue(task);
            //Monitor.Pulse(_locker);
            _mres.Set();
        }
    }

    public void Work()
    {
      while (_IsActive)
      {
          try
          {
              T item = Dequeue();
              if (item != null)
                  OnDequeued(item);
          }
          catch (Exception ex)
          {
              OnLogged(ex.Message + ex.StackTrace);
          }              
      }
    }

    #endregion
    private T Dequeue()
    {
        try
        {
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            if (dequeueItem != null)
                return dequeueItem;
            //}
            if (_IsActive)
            {
                _mres.Wait();
                _mres.Reset();
            }
            //_workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }

    }


    public void Clear()
    {
        _workerQueue = new ConcurrentQueue<T>();
    }
}

}

当调用 Dispose 时,它​​有时会阻塞连接(一个线程消耗)并且 dispose 方法被卡住。我猜它卡在了 resetEvents 的等待上,但为此我在 dispose 上调用了 set。有什么建议么?

4

2 回答 2

2

更新:我理解您关于内部需要队列的观点。我使用 a 的建议BlockingCollection<T>是基于您的代码包含大量逻辑来提供阻塞行为的事实。自己编写这样的逻辑很容易出现错误(我从经验中知道这一点);因此,当框架中有一个现有的类至少可以为您完成一些工作时,通常最好使用它。

关于如何使用 a 实现此类的完整示例BlockingCollection<T>有点太大,无法包含在此答案中,因此我在 pastebin.com 上发布了一个工作示例;随意看看,看看你的想法。

我还在这里编写了一个示例程序来演示上述示例。

我的代码正确吗?我不会自信地说“是”。毕竟,我没有编写单元测试,对其运行任何诊断等。这只是一个基本的草稿,让您了解如何使用BlockingCollection<T>而不是ConcurrentQueue<T>清理您的大量逻辑(在我看来)并使其更容易关注类的主要目的(从队列中消费项目并通知订阅者),而不是其实现的一些困难方面(内部队列的阻塞行为)。


评论中提出的问题:

你有什么理由不使用BlockingCollection<T>

您的答案:

[...] 我需要一个队列。

的默认构造函数的 MSDN 文档中BlockingCollection<T>

默认的基础集合是一个ConcurrentQueue<T>.

如果您选择实现自己的类而不是使用的唯一BlockingCollection<T>原因是您需要一个 FIFO 队列,那么……您可能需要重新考虑您的决定。使用BlockingCollection<T>默认无参数构造函数实例化的FIFO 队列。

也就是说,虽然我认为我无法对您发布的代码进行全面分析,但我至少可以提供一些建议:

  1. 对于处理这种棘手的多线程行为的类,我会非常犹豫是否以您在这里的方式使用事件。调用代码可以附加它想要的任何事件处理程序,而这些事件处理程序又会抛出异常(您不会捕获)、长时间阻塞,甚至可能由于完全超出您控制的原因而死锁——这在阻塞队列的情况。
  2. Dequeue您的andDispose方法中存在竞争条件。

看看你的Dequeue方法的这些行:

if (_IsActive) // point A
{
    _mres.Wait(); // point C
    _mres.Reset(); // point D
}

现在看看这两行Dispose

_IsActive = false;

_mres.Set(); // point B

假设您有三个线程, T 1、 T 2和 T 3。T 1和 T 2都在A点,每个都在这里检查_IsActive并找到true。然后Dispose被调用,并且 T 3设置_IsActivefalse(但 T 1和 T 2已经通过点A)然后到达点B,在那里它调用_mres.Set()。然后 T 1到达C点,继续移动到D点,并呼叫_mres.Reset()。现在 T 2到达C点并且将永远卡住,因为_mres.Set不会再次被调用(任何正在执行的线程Enqueue都会立即找到_IsActive == false并返回,并且正在执行的线程Dispose已经通过了B点)。

我很乐意尝试为解决这种竞争条件提供一些帮助,但我怀疑这BlockingCollection<T>实际上并不是您需要的课程。如果您可以提供更多信息来说服我事实并非如此,也许我会再看一下。

于 2010-10-08T00:43:29.483 回答
0

由于_IsActive未标记为volatile并且没有lock所有访问权限,因此每个内核都可以为此值具有单独的缓存,并且该缓存可能永远不会被刷新。所以标记_IsActive为 false inDispose实际上不会影响所有正在运行的线程。

http://igoro.com/archive/volatile-keyword-in-c-memory-model-explained/

private volatile bool _IsActive=true;
于 2010-10-08T01:32:41.720 回答