1

我需要实现一个生产者/消费者有界队列,多个消费者针对一个生产者。

我有一个推送功能,可以将一个项目添加到队列中,然后检查 maxsize。如果我们达到它返回false,在其他情况下返回true。

在以下代码中,_vector 是一个 List<T>,onSignal 基本上以异步方式消费一个项目。

您看到此代码有问题吗?

public bool Push(T message)
{
    bool canEnqueue = true;

    lock (_vector)
    {
        _vector.Add(message);
        if (_vector.Count >= _maxSize)
        {
            canEnqueue = false;
        }
    }

    var onSignal = SignalEvent;
    if (onSignal != null)
    {
        onSignal();
    }

    return canEnqueue;
}
4

2 回答 2

1

我知道你说的是单生产者,多消费者,但无论如何值得一提:如果你的队列几乎满了(比如 25 个插槽中的 24 个),那么如果Push同时有两个线程,你最终会超过限制。如果将来某个时候您甚至有可能拥有多个生产者,您应该考虑进行Push阻塞调用,并让它等待一个“可用” AutoResetEvent,该信号在一个项目出队或一个项目入队之后发出信号仍有可用的插槽。

我看到的唯一其他潜在问题是SignalEvent. 你没有向我们展示它的实现。如果它被声明为public event SignalEventDelegate SignalEvent,那么你会没事的,因为编译器会自动添加一个SynchronizedAttribute. 但是,如果SignalEvent使用带有add/remove语法的后备委托,那么您将需要为事件本身提供自己的锁定,否则消费者可能会为时已晚而从事件中分离出来,但仍会收到一些信号之后。

编辑:实际上,无论如何这是可能的;更重要的是,如果您在没有适当锁定的情况下使用了属性样式的添加/删除委托,那么当您尝试执行委托时,委托实际上可能处于无效状态。即使使用同步事件,消费者也需要准备好在取消订阅后接收(和丢弃)通知。

除此之外,我没有发现任何问题——尽管这并不意味着没有任何问题,只是意味着我没有注意到任何问题。

于 2010-01-31T02:42:21.047 回答
1

我看到的最大问题是使用List<T>来实现队列;这样做存在性能问题,因为删除第一项涉及复制所有数据。

额外的想法;即使您没有添加数据,您也会发出信号,并且使用事件本身可能会出现线程问题(存在一些边缘情况,即使您在null测试之前捕获值 - 加上它可能比使用Monitor来做信号)。

我会切换到一个Queue<T>不会有这个问题的 - 或者更好地使用预滚动的示例;例如在 .NET 中创建阻塞队列?,它完全符合您的讨论,并支持任意数量的生产者和消费者。它使用阻塞方法,但“尝试”方法是:

public bool TryEnqueue(T item)
{
    lock (queue)
    {
        if (queue.Count >= maxSize) { return false; }
        queue.Enqueue(item);
        if (queue.Count == 1)
        {
            // wake up any blocked dequeue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

最后 - 你不是“推”到堆栈而不是队列吗?

于 2010-01-31T07:22:24.320 回答