1

我有一个线程,它创建可变数量的工作线程并在它们之间分配任务。这可以通过向线程传递一个TaskQueue对象来解决,您将在下面看到其实现。

这些工作线程简单地遍历给定的TaskQueue对象,执行每个任务。

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enqueue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

如您所见,我使用AutoResetEvent对象来确保工作线程不会过早退出,即在获得任何任务之前。

简而言之:

  • 主线程通过Enqeueue将任务分配给线程 - 将任务添加到其 TaskQueue
  • 主线程通过调用TaskQueue的Finish ()方法通知线程没有任务要执行
  • 工作线程通过调用TaskQueue的Dequeue ()方法检索下一个分配给它的任务

问题是Dequeue () 方法经常抛出一个 InvalidOperationException,说 Queue 是空的。如您所见,我添加了一些日志记录,结果证明AutoResetEvent不会阻止Dequeue(),即使没有调用其Set()方法。

据我了解,调用 AutoResetEvent.Set() 将允许等待线程继续(之前调用 AutoResetEvent.WaitOne()),然后自动调用 AutoResetEvent.Reset(),阻止下一个服务员。

那么有什么问题呢?我是不是搞错了什么?我在某处有错误吗?我现在在上面坐了 3 个小时,但我不知道出了什么问题。请帮我!

非常感谢!

4

3 回答 3

6

您的出队代码不正确。您检查 Count 是否处于锁定状态,然后飞过裤子的接缝,然后您期望任务会有一些东西。释放锁时不能保留假设:)。您的 Count 检查和 tasks.Dequeue必须在锁定状态下发生:

bool TryDequeue(out Tasks task)
{
  task = null;
  lock (this.tasks) {
    if (0 < tasks.Count) {
      task = tasks.Dequeue();
    }
  }
  if (null == task) {
    Log.Trace ("Queue was empty");
  }
  return null != task;
 }

您的 Enqueue() 代码同样充满了问题。您的入队/出队不能确保进度(即使队列中有项目,您也会有出队线程阻塞等待)。你的签名Enqueue()是错误的。总的来说,你的帖子是非常非常糟糕的代码。坦率地说,我认为你在这里试图咀嚼的东西比你能咬的还多……哦,永远不要在锁定状态下登录

我强烈建议您只使用ConcurrentQueue

如果您无权访问 .Net 4.0,这里有一个帮助您入门的实现:

public class ConcurrentQueue<T>:IEnumerable<T>
{
    volatile bool fFinished = false;
    ManualResetEvent eventAdded = new ManualResetEvent(false);
    private Queue<T> queue = new Queue<T>();
    private object syncRoot = new object();

    public void SetFinished()
    {
        lock (syncRoot)
        {
            fFinished = true;
            eventAdded.Set();
        }
    }

    public void Enqueue(T t)
    {
        Debug.Assert (false == fFinished);
        lock (syncRoot)
        {
            queue.Enqueue(t);
            eventAdded.Set();
        }
    }

    private bool Dequeue(out T t)
    {
        do
        {
            lock (syncRoot)
            {
                if (0 < queue.Count)
                {
                    t = queue.Dequeue();
                    return true;
                }
                if (false == fFinished)
                {
                    eventAdded.Reset ();
                }
            }
            if (false == fFinished)
            {
                eventAdded.WaitOne();
            }
            else
            {
                break;
            }
        } while (true);
        t = default(T);
        return false;
    }


    public IEnumerator<T> GetEnumerator()
    {
        T t;
        while (Dequeue(out t))
        {
            yield return t;
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
于 2010-08-25T17:17:44.090 回答
3

我的更详细的答案正在等待中,但我只想指出一些非常重要的事情。

如果您使用的是 .NET 3.5,则可以使用ConcurrentQueue<T>. 向后移植包含在Rx 扩展库中,可用于 .NET 3.5。

由于您想要阻止行为,您需要将 a 包装ConcurrentQueue<T>在 a 中BlockingCollection<T>(也可作为 Rx 的一部分)。

于 2010-08-25T18:21:04.393 回答
1

看起来您正在尝试复制阻塞队列。一个已经作为BlockingCollection存在于 .NET 4.0 BCL 中。如果 .NET 4.0 不适合您,那么您可以使用此代码。它使用Monitor.WaitandMonitor.Pulse方法而不是AutoResetEvent.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take() // Dequeue
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public void Add(T data) // Enqueue
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(data);
            Monitor.Pulse(m_Queue);
        }
    }
}

更新:

相当肯定,如果您希望它对多个生产者和多个消费者是线程安全的,那么使用它是不可能实现生产者 - 消费者队列AutoResetEvent的(如果有人能提出反例,我准备被证明是错误的) . 当然,您会在互联网上看到示例,但它们都是错误的。实际上,Microsoft 的一项此类尝试存在缺陷,即队列可能会被实时锁定

于 2010-08-25T17:32:35.693 回答