4

在此处输入图像描述我有一个如下的生产者/消费者队列,但我得到了ArgumentWException.

以下是代码:

public class ProducerConsumer<T> where T : class
    {
        #region Private Variables
        private Thread _workerThread;
        private readonly Queue<T> _workQueue;
        private  object _enqueueItemLocker = new object();
        private  object _processRecordLocker = new object();
        private readonly Action<T> _workCallbackAction;
        private AutoResetEvent _workerWaitSignal;
        #endregion

        #region Constructor
        public ProducerConsumer(Action<T> action)
        {
            _workQueue = new Queue<T>();
            _workCallbackAction = action;

        }
        #endregion
        #region Private Methods 
        private void ProcessRecord()
        {
            while (true)
            {               
                T workItemToBeProcessed = default(T);
                bool hasSomeWorkItem = false;
                lock (_processRecordLocker)
                {
                    hasSomeWorkItem = _workQueue.Count > 0;

                    if (hasSomeWorkItem)
                    {
                        workItemToBeProcessed = _workQueue.Dequeue();
                        if (workItemToBeProcessed == null)
                        {
                            return;
                        }
                    }
                }
                if (hasSomeWorkItem)
                {
                    if (_workCallbackAction != null)
                    {
                        _workCallbackAction(workItemToBeProcessed);
                    }
                }
                else
                {
                    _workerWaitSignal.WaitOne();
                }
            }
        }
        #endregion

        #region Public Methods
        /// <summary>
        /// Enqueues work item in the queue.
        /// </summary>
        /// <param name="workItem">The work item.</param>
        public void EnQueueWorkItem(T workItem)
        {
            lock (_enqueueItemLocker)
            {               
                _workQueue.Enqueue(workItem);

                if (_workerWaitSignal == null)
                {
                    _workerWaitSignal = new AutoResetEvent(false);
                }

                _workerWaitSignal.Set();
            }
        }
        /// <summary>
        /// Stops the processer, releases wait handles.
        /// </summary>
        /// <param name="stopSignal">The stop signal.</param>
        public void StopProcesser(AutoResetEvent stopSignal)
        {
            EnQueueWorkItem(null);

            _workerThread.Join();
            _workerWaitSignal.Close();
            _workerWaitSignal = null;

            if (stopSignal != null)
            {
                stopSignal.Set();
            }
        }
        /// <summary>
        /// Starts the processer, starts a new worker thread.
        /// </summary>
        public void StartProcesser()
        {
            if (_workerWaitSignal == null)
            {
                _workerWaitSignal = new AutoResetEvent(false);
            }
            _workerThread = new Thread(ProcessRecord) { IsBackground = true };
            _workerThread.Start();
        }
        #endregion
    }

另一类是:

public class Tester
{
    private readonly ProducerConsumer<byte[]> _proConsumer;
    public Tester()
    {
        _proConsumer = new ProducerConsumer<byte[]>(Display);
    }
    public void AddData(byte[] data)
    {
        try
        {
            _proConsumer.EnQueueWorkItem(recordData);
        }
        catch (NullReferenceException nre)
        {

        }
    }
    public void Start()
    {
        _proConsumer.StartProcesser();
    }

    private static object _recordLocker = new object();

    private void Display(byte[] recordByteStream)
    {
        try
        {
            lock (_recordLocker)
            {
                Console.WriteLine("Done with data:" + BitConverter.ToInt32(recordByteStream, 0));

            }

        }
        catch (Exception ex)
        {

        }

    }
}

我的主要功能:

class Program
    {
        private static Tester _recorder;
        static void Main(string[] args)
        {
            _recorder = new Tester();
            _recorder.StartRecording();

            for (int i = 0; i < 100000; i++)
            {
                _recorder.AddRecordData(BitConverter.GetBytes(i));              
            }

            Console.Read();
        }
    }

知道为什么我会得到异常,我应该怎么做才能避免这种情况?

4

1 回答 1

8

您的类在其当前实现中不是线程安全的。您在Enqueue( lock (_enqueueItemLocker)) 和Dequeue( lock (_processRecordLocker)) 调用中使用了两个不同的对象,这会在Queue<T>.

为了安全地使用队列,您需要在两个调用中锁定相同的对象实例。

如果您使用的是 .NET 4,我建议您使用ConcurrentQueue<T>orBlockingCollection<T>代替,因为这些将消除您的代码中对锁的需求,因为它们是线程安全的。

于 2012-12-11T23:07:31.380 回答