我正在做一个学术开源项目,现在我需要在 C# 中创建一个快速阻塞 FIFO 队列。我的第一个实现只是将一个同步队列(带动态扩展)包装在读取器的信号量中,然后我决定以以下(理论上更快)的方式重新实现
public class FastFifoQueue<T>
{
private T[] _array;
private int _head, _tail, _count;
private readonly int _capacity;
private readonly Semaphore _readSema, _writeSema;
/// <summary>
/// Initializes FastFifoQueue with the specified capacity
/// </summary>
/// <param name="size">Maximum number of elements to store</param>
public FastFifoQueue(int size)
{
//Check if size is power of 2
//Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
if ((size & (size - 1)) != 0)
throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");
_capacity = size;
_array = new T[size];
_count = 0;
_head = int.MinValue; //0 is the same!
_tail = int.MinValue;
_readSema = new Semaphore(0, _capacity);
_writeSema = new Semaphore(_capacity, _capacity);
}
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
Interlocked.Exchange(ref _array[index], item);
Interlocked.Increment(ref _count);
_readSema.Release();
}
public T Dequeue()
{
_readSema.WaitOne();
int index = Interlocked.Increment(ref _tail);
index %= _capacity;
if (index < 0) index += _capacity;
T ret = Interlocked.Exchange(ref _array[index], null);
Interlocked.Decrement(ref _count);
_writeSema.Release();
return ret;
}
public int Count
{
get
{
return _count;
}
}
}
这是我们在教科书中找到的带有静态数组的经典 FIFO 队列实现。它旨在以原子方式递增指针,并且由于我无法使指针在达到(容量-1)时回到零,因此我计算模数。从理论上讲,使用 Interlocked 与在进行增量之前锁定相同,并且由于存在信号量,因此可以有多个生产者/消费者进入队列,但一次只能修改一个队列指针。首先,因为 Interlocked.Increment 先递增,然后返回,我已经明白我仅限于使用后递增值并从数组中的位置 1 开始存储项目。没问题,到了一定值我就回0了
它有什么问题?您不会相信,在重负载下运行时,有时队列会返回 NULL 值。我确定,重复一遍,我确定,没有任何方法将null排入队列。这绝对是正确的,因为我尝试在 Enqueue 中进行空检查以确保没有抛出任何错误。我用 Visual Studio 为此创建了一个测试用例(顺便说一下,我使用像 maaaaaaaany 人这样的双核 CPU)
private int _errors;
[TestMethod()]
public void ConcurrencyTest()
{
const int size = 3; //Perform more tests changing it
_errors = 0;
IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
Thread[] producers = new Thread[size], consumers = new Thread[size];
for (int i = 0; i < size; i++)
{
producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
producers[i].Start(queue);
consumers[i].Start(queue);
}
Thread.Sleep(new TimeSpan(0, 0, 1, 0));
for (int i = 0; i < size; i++)
{
producers[i].Abort();
consumers[i].Abort();
}
Assert.AreEqual(0, _errors);
}
private void LoopProducer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
try
{
q.Enqueue(new object());
}
catch
{ }
}
}
catch (ThreadAbortException)
{ }
}
private void LoopConsumer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
object item = q.Dequeue();
if (item == null) Interlocked.Increment(ref _errors);
}
}
catch (ThreadAbortException)
{ }
}
一旦消费者线程得到一个空值,就会计算一个错误。当使用 1 个生产者和 1 个消费者执行测试时,它成功了。当使用 2 个生产者和 2 个消费者或更多进行测试时,会发生灾难:甚至检测到 2000 次泄漏。我发现问题可能出在 Enqueue 方法中。根据设计合同,生产者只能写入空的单元格(null),但是通过一些诊断修改我的代码我发现有时生产者试图在非空单元格上写入,然后被“好“ 数据。
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
T leak = Interlocked.Exchange(ref _array[index], item);
//Diagnostic code
if (leak != null)
{
throw new InvalidOperationException("Too bad...");
}
Interlocked.Increment(ref _count);
_readSema.Release();
}
然后经常发生“太糟糕”的异常。但是并发写入引发的冲突太奇怪了,因为增量是原子的,并且写入器的信号量只允许与空闲数组单元一样多的写入器。
有人可以帮我吗?如果您与我分享您的技能和经验,我将不胜感激。
谢谢你。