2

我正在尝试实现一个支持消费者超时的并发生产者-消费者集合(多个生产者和消费者)。

现在实际的集合非常复杂(不幸的是 System.Collections.Concurrent 中没有任何东西可以完成这项工作),但我在这里有一个最小的示例来演示我的问题(看起来有点像BlockingCollection<T>)。

public sealed class ProducerConsumerQueueDraft<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly object locker = new object();

    public void Enqueue(T item)
    {
        lock (locker)
        {
            queue.Enqueue(item);

            /* This "optimization" is broken, as Nicholas Butler points out.
            if(queue.Count == 1) // Optimization
            */
                Monitor.Pulse(locker); // Notify any waiting consumer threads.
        }
    }

    public T Dequeue(T item)
    {
        lock (locker)
        {
            // Surprisingly, this needs to be a *while* and not an *if*
            // which is the core of my problem.
            while (queue.Count == 0)
                Monitor.Wait(locker);

            return queue.Dequeue();
        }
    }

    // This isn't thread-safe, but is how I want TryDequeue to look.
    public bool TryDequeueDesired(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
            {
                item = default(T);
                return false;
            }

            // This is wrong! The queue may be empty even though we were pulsed!
            item = queue.Dequeue();
            return true;
        }
    }

    // Has nasty timing-gymnastics I want to avoid.
    public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            var watch = Stopwatch.StartNew();
            while (queue.Count == 0)
            {
                var remaining = timeout - watch.Elapsed;

                if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
                {
                    item = default(T);
                    return false;
                }
            }
            item = queue.Dequeue();
            return true;
        }
    }
}

这个想法很简单:发现空队列的消费者等待收到信号,而生产者Pulse(注意:not PulseAll,这将是低效的)通知他们等待项目。

我的问题是这个属性 Monitor.Pulse

当调用 Pulse 的线程释放锁时,就绪队列中的下一个线程(不一定是被脉冲的线程)获取锁。

这意味着消费者线程 C1 可以被生产者线程唤醒以消费项目,但另一个消费者线程 C2 可以在 C1 有机会重新获取锁之前获取锁,并消费项目,让 C1 与获得控制权时的空队列。

这意味着如果队列确实非空,我必须在每个脉冲上防御性地检查消费者代码,如果不是这种情况,则返回并空手等待。

我的主要问题是它效率低下 - 线程可能会被唤醒以完成工作,然后立即被送回再次等待。与此相关的一个结果是,当它应该是优雅的(请参阅参考资料)时,实现TryDequeue具有超时的 a 是不必要的困难和低效(请参阅参考资料)。TryDequeueThatWorksTryDequeueDesired

我怎样才能扭转Monitor.Pulse做我想做的事?或者,是否有另一个同步原语可以做到?TryDequeue有没有比我做的更有效和/或更优雅的方式来实现超时?

仅供参考,这是一个测试,演示了我想要的解决方案的问题:

var queue = new ProducerConsumerQueueDraft<int>();

for (int consumer = 0; consumer < 3; consumer++)
    new Thread(() =>
    {
        while (true)
        {
            int item;

            // This call should occasionally throw an exception.
            // Switching to queue.TryDequeueThatWorks should make
            // the problem go away.
            if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
            {
                // Do nothing.
            }
        }

    }).Start();

Thread.Sleep(1000); // Let consumers get up and running

for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
    queue.Enqueue(0);
}
4

3 回答 3

2

我的主要问题是它效率低下

它不是。您认为这是一种常见的情况,但这种比赛很少发生。一次在蓝月亮,充其量。while 循环是必要的,以确保它发生时不会出错。它会的。不要惹它。

事实上恰恰相反,锁设计是有效的,因为它确实允许发生竞争。并处理它。修补锁定设计非常危险,因为比赛发生的频率不够高。它们非常随机,无法进行足够的测试来证明更改不会导致失败。添加任何检测代码也不起作用,它会改变时间。

于 2012-05-20T15:54:04.193 回答
1

这是一个简单的基于键的合并生产者-消费者队列:

public class ConflatingConcurrentQueue<TKey, TValue>
{
    private readonly ConcurrentDictionary<TKey, Entry> entries;
    private readonly BlockingCollection<Entry> queue;

    public ConflatingConcurrentQueue()
    {
        this.entries = new ConcurrentDictionary<TKey, Entry>();
        this.queue = new BlockingCollection<Entry>();
    }

    public void Enqueue(TValue value, Func<TValue, TKey> keySelector)
    {
        // Get the entry for the key. Create a new one if necessary.
        Entry entry = entries.GetOrAdd(keySelector(value), k => new Entry());

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Replace any old value with the new one.
            entry.Value = value;

            // Add the entry to the queue if it's not enqueued yet.
            if (!entry.Enqueued)
            {
                entry.Enqueued = true;
                queue.Add(entry);
            }
        }
    }

    public bool TryDequeue(out TValue value, TimeSpan timeout)
    {
        Entry entry;

        // Try to dequeue an entry (with timeout).
        if (!queue.TryTake(out entry, timeout))
        {
            value = default(TValue);
            return false;
        }

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Return the value.
            value = entry.Value;

            // Mark the entry as dequeued.
            entry.Enqueued = false;
            entry.Value = default(TValue);
        }

        return true;
    }

    private class Entry
    {
        public TValue Value { get; set; }
        public bool Enqueued { get; set; }
    }
}

(这可能需要一两次代码审查,但我认为总的来说这是理智的。)

于 2012-05-20T15:59:07.083 回答
1

我写了一篇关于此的文章,可能会有所帮助:

线程同步:Wait 和 Pulse 揭秘

特别是,它解释了为什么需要while循环。

于 2012-05-20T16:00:42.200 回答