2

这是线程安全队列的实现。push 和 pop 不会互相阻塞。但是,如果队列为空,pop 将等到一个项目被推送。多个生产者和消费者可以使用队列。如果您发现任何问题,请告诉我。

更新:根据答案编辑 1)解决了“队列已满”情况的问题。2).NET4中有BlockingCollection<T>ConcurrentQueue<T>。所以没有必要重新发明轮子(对于.NET4)

public class CustomQueue<T> where T: class
{
    class Node
    {
        public Node()
        {
            Value = null;
            NextNode = null;
        }

        public Node(T value)
        {
            Value = value;
            NextNode = null;
        }

        public T Value;
        public Node NextNode;
    }

    object PushLocker = new object();
    object PopLocker = new object();
    Semaphore QueueSemaphore;
    volatile int PushIncrement;
    volatile int PopIncrement;
    int MaxItems;
    Node FirstNode;
    Node LastNode;

    public CustomQueue(int maxItems)
    {
        QueueSemaphore = new Semaphore(0, maxItems);
        MaxItems = maxItems;
        FirstNode = LastNode = new Node();
        PushIncrement = 0;
        PopIncrement = 0;
    }

    public int Size()
    {
        return PushIncrement - PopIncrement;
    }

    public bool Push(T value)
    {
        lock(PushLocker)
        {
            if((Size()) >= MaxItems)
            {
                lock(PopLocker)
                {
                    PushIncrement = PushIncrement - PopIncrement;
                    PopIncrement = 0;
                    return false;
                }
            }
            Node newNode = new Node(value);                
            LastNode.NextNode = newNode;
            LastNode = newNode;
            PushIncrement++;
            QueueSemaphore.Release();
            return true;
        }            
    }

    public T Pop()
    {
        QueueSemaphore.WaitOne();
        lock(PopLocker)
        {
            Node tempFirst = FirstNode;
            Node tempNext = FirstNode.NextNode;
            T value = tempNext.Value;
            tempNext.Value = null;
            FirstNode = tempNext;
            PopIncrement++;
            return value;
        }
    }
}
4

5 回答 5

3

如果您这样做是为了自我教育,那就太好了-否则BlockingCollection<T>或者ConcurrentQueue<T>是不错的选择。

我在这里看到的一个问题是,一旦它开始就没有办法中断Pop——它假设一个对象在它醒来时正在等待。您将如何在终止时清除此问题?TryPop返回true(带有元素)或(如果没有数据)的Afalse可能会更好,那么您可以发出等待线程在队列耗尽后干净地关闭的信号。

于 2010-11-10T14:33:00.123 回答
3

1.考虑添加第二个 Node 构造函数:

        public Node(T value)
        {
            Value = value;
        }

然后你的客户代码:

            Node newNode = new Node();
            newNode.Value = value;

可以将值视为不变量:

            Node newNode = new Node(value);

2.然后让你的公共领域:

        public T Value;
        public Node NextNode;

进入自动属性:

        public T Value { get; private set; };
        public Node NextNode { get; set; };

因此,您可以从实现中抽象出使用,并在事后添加验证、其他处理等,而对客户端代码的干扰最小。

于 2010-11-10T14:34:42.943 回答
3

乍一看,它看起来像是一个很好的实现。使用不同的锁对我来说总是一个危险信号,所以我仔细研究了一些涉及同时调用Popand的边缘情况Push,它看起来很安全。我怀疑您可能对阻塞队列的链表实现进行了自我教育,对吧?这是安全的原因是因为你只引用和引用,LastNode否则整个技巧就会崩溃。PushFirstNodePop

现在唯一让我感到困惑的是,当您尝试从 a 释放计数时,Semaphore如果它已经满了,它会抛出异常,因此您可能要防止这种情况发生。1否则,您最终会在链表中得到额外的节点,并且队列将 1)拥有超过最大数量的项目,并且 2)它将被活锁。

更新:

我对此进行了更多思考。该方法Release中的调用Push将是非常有问题的。我只看到一种可能的解决方案。从构造函数中删除maxItems参数并允许信号量计数到Int.MaxValue. 否则,您将不得不从根本上改变您的方法,从而导致实现与您当前所拥有的完全不同。

1我想你会发现这比你可能立即意识到的要困难得多。

于 2010-11-10T15:06:09.277 回答
2

如果您有 .NET 4,请参阅此内容:阻塞收集和生产者-消费者问题

于 2010-11-10T14:32:42.060 回答
0

由于我是不可变对象的粉丝,这里有一个替代我之前的答案的替代方案,我认为它更简洁:

public sealed class CustomQueue<T> where T : class
{
    private readonly object pushLocker = new object();
    private readonly object popLocker = new object();
    private readonly Semaphore queueSemaphore;
    private readonly int maxItems;
    private volatile int pushIncrement;
    private volatile int popIncrement;
    private Node firstNode = new Node();
    private Node lastNode;

    public CustomQueue(int maxItems)
    {
        this.maxItems = maxItems;
        this.lastNode = this.firstNode;
        this.queueSemaphore = new Semaphore(0, this.maxItems);
    }

    public int Size
    {
        get
        {
            return this.pushIncrement - this.popIncrement;
        }
    }

    public bool Push(T value)
    {
        lock (this.pushLocker)
        {
            if (this.Size >= this.maxItems)
            {
                lock (this.popLocker)
                {
                    this.pushIncrement = this.pushIncrement - this.popIncrement;
                    this.popIncrement = 0;
                    return false;
                }
            }

            Node newNode = new Node(value, this.lastNode.NextNode);

            this.lastNode = new Node(this.lastNode.Value, newNode);
            this.firstNode = new Node(null, newNode);
            this.pushIncrement++;
            this.queueSemaphore.Release();
            return true;
        }
    }

    public T Pop()
    {
        this.queueSemaphore.WaitOne();
        lock (this.popLocker)
        {
            Node tempNext = this.firstNode.NextNode;
            T value = tempNext.Value;

            this.firstNode = tempNext;
            this.popIncrement++;
            return value;
        }
    }

    private sealed class Node
    {
        private readonly T value;

        private readonly Node nextNode;

        public Node()
        {
        }

        public Node(T value, Node nextNode)
        {
            this.value = value;
            this.nextNode = nextNode;
        }

        public T Value
        {
            get
            {
                return this.value;
            }
        }

        public Node NextNode
        {
            get
            {
                return this.nextNode;
            }
        }
    }
}
于 2010-11-11T15:11:11.767 回答