我正在寻找有关如何将 MP/MC 队列编写为无锁甚至无等待的文档。我正在使用.Net 4.0。找了很多C++代码,但是我对内存模型不是很熟悉,所以很有可能在移植到C#的时候会引入一些bug。


3 回答 3


作为一个可供考虑的选项,Dmitry Vyukov 提供了有界多生产者多消费者队列的算法。我已将该算法移植到 .NET,您可以在 github 上找到源代码。它非常快。


public bool TryEnqueue(object item)
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
            // write the item we want to enqueue
            Volatile.Write(ref buffer[index].Element, item);
            // bump the sequence
            buffer[index].Sequence = pos + 1;
            return true;

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
            return false;

        // repeat the process if other producer managed to enqueue before us
    } while (true);


public bool TryDequeue(out object result)
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
            // read the item
            result = Volatile.Read(ref cell.Element);
            // update for the next round of the buffer
            buffer[index] = new Cell(pos + bufferMask + 1, null);
            return true;

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
            result = default(object);
            return false;

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
于 2016-12-13T14:21:48.640 回答

为什么你认为你需要无锁队列?您是否尝试过使用ConcurrentQueue<T>,可能包含在 a 中BlockingCollection<T>


于 2011-05-20T23:01:12.477 回答


我使用 ILSpy 进行了查看,ConcurrentQueue<T>乍一看似乎是一个无锁实现 - 很有可能这正是您正在寻找的。

于 2011-05-20T23:51:34.490 回答