我正在尝试实现一个支持消费者超时的并发生产者-消费者集合(多个生产者和消费者)。
现在实际的集合非常复杂(不幸的是 System.Collections.Concurrent 中没有任何东西可以完成这项工作),但我在这里有一个最小的示例来演示我的问题(看起来有点像BlockingCollection<T>
)。
public sealed class ProducerConsumerQueueDraft<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly object locker = new object();
public void Enqueue(T item)
{
lock (locker)
{
queue.Enqueue(item);
/* This "optimization" is broken, as Nicholas Butler points out.
if(queue.Count == 1) // Optimization
*/
Monitor.Pulse(locker); // Notify any waiting consumer threads.
}
}
public T Dequeue(T item)
{
lock (locker)
{
// Surprisingly, this needs to be a *while* and not an *if*
// which is the core of my problem.
while (queue.Count == 0)
Monitor.Wait(locker);
return queue.Dequeue();
}
}
// This isn't thread-safe, but is how I want TryDequeue to look.
public bool TryDequeueDesired(out T item, TimeSpan timeout)
{
lock (locker)
{
if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
{
item = default(T);
return false;
}
// This is wrong! The queue may be empty even though we were pulsed!
item = queue.Dequeue();
return true;
}
}
// Has nasty timing-gymnastics I want to avoid.
public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
{
lock (locker)
{
var watch = Stopwatch.StartNew();
while (queue.Count == 0)
{
var remaining = timeout - watch.Elapsed;
if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
{
item = default(T);
return false;
}
}
item = queue.Dequeue();
return true;
}
}
}
这个想法很简单:发现空队列的消费者等待收到信号,而生产者Pulse
(注意:not PulseAll
,这将是低效的)通知他们等待项目。
我的问题是这个属性 Monitor.Pulse
:
当调用 Pulse 的线程释放锁时,就绪队列中的下一个线程(不一定是被脉冲的线程)获取锁。
这意味着消费者线程 C1 可以被生产者线程唤醒以消费项目,但另一个消费者线程 C2 可以在 C1 有机会重新获取锁之前获取锁,并消费项目,让 C1 与获得控制权时的空队列。
这意味着如果队列确实非空,我必须在每个脉冲上防御性地检查消费者代码,如果不是这种情况,则返回并空手等待。
我的主要问题是它效率低下 - 线程可能会被唤醒以完成工作,然后立即被送回再次等待。与此相关的一个结果是,当它应该是优雅的(请参阅参考资料)时,实现TryDequeue
具有超时的 a 是不必要的困难和低效(请参阅参考资料)。TryDequeueThatWorks
TryDequeueDesired
我怎样才能扭转Monitor.Pulse
做我想做的事?或者,是否有另一个同步原语可以做到?TryDequeue
有没有比我做的更有效和/或更优雅的方式来实现超时?
仅供参考,这是一个测试,演示了我想要的解决方案的问题:
var queue = new ProducerConsumerQueueDraft<int>();
for (int consumer = 0; consumer < 3; consumer++)
new Thread(() =>
{
while (true)
{
int item;
// This call should occasionally throw an exception.
// Switching to queue.TryDequeueThatWorks should make
// the problem go away.
if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
{
// Do nothing.
}
}
}).Start();
Thread.Sleep(1000); // Let consumers get up and running
for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
queue.Enqueue(0);
}