我有一个线程,它创建可变数量的工作线程并在它们之间分配任务。这可以通过向线程传递一个TaskQueue对象来解决,您将在下面看到其实现。
这些工作线程简单地遍历给定的TaskQueue对象,执行每个任务。
private class TaskQueue : IEnumerable<Task>
{
public int Count
{
get
{
lock(this.tasks)
{
return this.tasks.Count;
}
}
}
private readonly Queue<Task> tasks = new Queue<Task>();
private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);
private bool isFinishing = false;
private bool isFinished = false;
public void Enqueue(Task task)
{
Log.Trace("Entering Enqueue, lock...");
lock(this.tasks)
{
Log.Trace("Adding task, current count = {0}...", Count);
this.tasks.Enqueue(task);
if (Count == 1)
{
Log.Trace("Count = 1, so setting the wait handle...");
this.taskWaitHandle.Set();
}
}
Log.Trace("Exiting enqueue...");
}
public Task Dequeue()
{
Log.Trace("Entering Dequeue...");
if (Count == 0)
{
if (this.isFinishing)
{
Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
this.isFinished = true;
return new Task();
}
Log.Trace("Count = 0, lets wait for a task...");
this.taskWaitHandle.WaitOne();
Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);
if(this.isFinishing)
{
Log.Trace("Finishing - isCompleted set, returning empty task.");
this.isFinished = true;
return new Task();
}
}
Log.Trace("Entering task lock...");
lock(this.tasks)
{
Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
return this.tasks.Dequeue();
}
}
public void Finish()
{
Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
this.isFinishing = true;
if (Count == 0)
{
this.taskWaitHandle.Set();
}
}
public IEnumerator<Task> GetEnumerator()
{
while(true)
{
Task t = Dequeue();
if(this.isFinished)
{
yield break;
}
yield return t;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
如您所见,我使用AutoResetEvent对象来确保工作线程不会过早退出,即在获得任何任务之前。
简而言之:
- 主线程通过Enqeueue将任务分配给线程 - 将任务添加到其 TaskQueue
- 主线程通过调用TaskQueue的Finish ()方法通知线程没有任务要执行
- 工作线程通过调用TaskQueue的Dequeue ()方法检索下一个分配给它的任务
问题是Dequeue () 方法经常抛出一个 InvalidOperationException,说 Queue 是空的。如您所见,我添加了一些日志记录,结果证明AutoResetEvent不会阻止Dequeue(),即使没有调用其Set()方法。
据我了解,调用 AutoResetEvent.Set() 将允许等待线程继续(之前调用 AutoResetEvent.WaitOne()),然后自动调用 AutoResetEvent.Reset(),阻止下一个服务员。
那么有什么问题呢?我是不是搞错了什么?我在某处有错误吗?我现在在上面坐了 3 个小时,但我不知道出了什么问题。请帮我!
非常感谢!