2

我正在做一个学术开源项目,现在我需要在 C# 中创建一个快速阻塞 FIFO 队列。我的第一个实现只是将一个同步队列(带动态扩展)包装在读取器的信号量中,然后我决定以以下(理论上更快)的方式重新实现

public class FastFifoQueue<T>
{
    private T[] _array;
    private int _head, _tail, _count;
    private readonly int _capacity;
    private readonly Semaphore _readSema, _writeSema;

    /// <summary>
    /// Initializes FastFifoQueue with the specified capacity
    /// </summary>
    /// <param name="size">Maximum number of elements to store</param>
    public FastFifoQueue(int size)
    {
        //Check if size is power of 2
        //Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
        if ((size & (size - 1)) != 0)
            throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");

        _capacity = size;
        _array = new T[size];
        _count = 0;
        _head = int.MinValue; //0 is the same!
        _tail = int.MinValue;

        _readSema = new Semaphore(0, _capacity);
        _writeSema = new Semaphore(_capacity, _capacity);
    }

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        Interlocked.Exchange(ref _array[index], item);
        Interlocked.Increment(ref _count);
        _readSema.Release();
    }

    public T Dequeue()
    {
        _readSema.WaitOne();
        int index = Interlocked.Increment(ref _tail);
        index %= _capacity;
        if (index < 0) index += _capacity;
        T ret = Interlocked.Exchange(ref _array[index], null);
        Interlocked.Decrement(ref _count);
        _writeSema.Release();

        return ret;
    }

    public int Count
    {
        get
        {
            return _count;
        }
    }
}

这是我们在教科书中找到的带有静态数组的经典 FIFO 队列实现。它旨在以原子方式递增指针,并且由于我无法使指针在达到(容量-1)时回到零,因此我计算模数。从理论上讲,使用 Interlocked 与在进行增量之前锁定相同,并且由于存在信号量,因此可以有多个生产者/消费者进入队列,但一次只能修改一个队列指针。首先,因为 Interlocked.Increment 先递增,然后返回,我已经明白我仅限于使用后递增值并从数组中的位置 1 开始存储项目。没问题,到了一定值我就回0了

它有什么问题?您不会相信,在重负载下运行时,有时队列会返回 NULL 值。我确定,重复一遍,我确定,没有任何方法将null排入队列。这绝对是正确的,因为我尝试在 Enqueue 中进行空检查以确保没有抛出任何错误。我用 Visual Studio 为此创建了一个测试用例(顺便说一下,我使用像 maaaaaaaany 人这样的双核 CPU)

    private int _errors;

    [TestMethod()]
    public void ConcurrencyTest()
    {
        const int size = 3; //Perform more tests changing it
        _errors = 0;
        IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
        Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
        Thread[] producers = new Thread[size], consumers = new Thread[size];

        for (int i = 0; i < size; i++)
        {
            producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
            consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
            producers[i].Start(queue);
            consumers[i].Start(queue);
        }

        Thread.Sleep(new TimeSpan(0, 0, 1, 0));

        for (int i = 0; i < size; i++)
        {
            producers[i].Abort();
            consumers[i].Abort();
        }

        Assert.AreEqual(0, _errors);
    }

    private void LoopProducer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                try
                {
                    q.Enqueue(new object());
                }
                catch
                { }

            }
        }
        catch (ThreadAbortException)
        { }
    }

    private void LoopConsumer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                object item = q.Dequeue();
                if (item == null) Interlocked.Increment(ref _errors);
            }
        }
        catch (ThreadAbortException)
        { }

    }

一旦消费者线程得到一个空值,就会计算一个错误。当使用 1 个生产者和 1 个消费者执行测试时,它成功了。当使用 2 个生产者和 2 个消费者或更多进行测试时,会发生灾难:甚至检测到 2000 次泄漏。我发现问题可能出在 Enqueue 方法中。根据设计合同,生产者只能写入空的单元格(null),但是通过一些诊断修改我的代码我发现有时生产者试图在非空单元格上写入,然后被“好“ 数据。

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        T leak = Interlocked.Exchange(ref _array[index], item);

        //Diagnostic code
        if (leak != null)
        {
            throw new InvalidOperationException("Too bad...");
        }
        Interlocked.Increment(ref _count);

        _readSema.Release();
    }

然后经常发生“太糟糕”的异常。但是并发写入引发的冲突太奇怪了,因为增量是原子的,并且写入器的信号量只允许与空闲数组单元一样多的写入器。

有人可以帮我吗?如果您与我分享您的技能和经验,我将不胜感激。

谢谢你。

4

3 回答 3

6

我必须说,这让我觉得这是一个非常聪明的想法,在我开始意识到(我认为)错误在哪里之前,我考虑了一段时间。所以,一方面,想出这样一个聪明的设计是值得称赞的!但是,与此同时,您为证明“克尼汉定律”而感到羞耻:

首先,调试的难度是编写代码的两倍。因此,如果您尽可能巧妙地编写代码,那么根据定义,您还不够聪明,无法对其进行调试。

问题基本上是这样的:您假设WaitOneandRelease调用有效地序列化了您的所有EnqueueandDequeue操作;但这并不是这里发生的事情。请记住,Semaphore该类用于限制访问资源的线程数而不是确保事件的特定顺序。每个和之间发生的事情不能保证以与and调用自身相同的“线程顺序”发生。WaitOneReleaseWaitOneRelease

这很难用文字来解释,所以让我试着提供一个视觉说明。

假设您的队列容量为 8,看起来像这样(让我们0表示nullx表示一个对象):

[ xxxxxxx ]

所以Enqueue已被调用 8 次,队列已满。因此,您的_writeSema信号量将阻塞WaitOne,并且您的_readSema信号量将立即返回WaitOne

现在让我们假设Dequeue在 3 个不同的线程上或多或少地同时调用。我们称它们为 T1、T2 和 T3。

在继续之前,让我为您的Dequeue实现应用一些标签,以供参考:

public T Dequeue()
{
    _readSema.WaitOne();                                   // A
    int index = Interlocked.Increment(ref _tail);          // B
    index %= _capacity;
    if (index < 0) index += _capacity;
    T ret = Interlocked.Exchange(ref _array[index], null); // C
    Interlocked.Decrement(ref _count);
    _writeSema.Release();                                  // D

    return ret;
}

好的,所以 T1、T2 和 T3 都经过了 A点。然后为简单起见,假设他们每个人都“按顺序”到达B线,因此 T1 的 anindex为 0,T2 的 anindex为 1,T3 的 anindex为 2。

到现在为止还挺好。但这里有个问题:不能保证从这里开始,T1、T2 和 T3 会以任何指定的顺序到达D行。假设 T3 实际上领先于 T1 和 T2,经过C行(因此设置_array[2]null)并一直移动到D行。

在此之后,_writeSema将发出信号,这意味着您的队列中有一个可用的插槽可以写入,对吗?但是您的队列现在看起来像这样!

[ xx 0 xxxxx ]

因此,如果同时调用另一个线程Enqueue,它实际上会过去 _writeSema.WaitOne,递增_head,并获得index0,即使插槽 0 不是空的。这样做的结果是,在 T1(还记得他吗?)读取它之前,插槽 0 中的项目实际上可能被覆盖。

要了解您的null价值观来自何处,您只需想象我刚才描述的过程的相反过程。也就是说,假设您的队列如下所示:

[ 0 0 0 0 0 0 0 0 ]

三个线程 T1、T2 和 T3Enqueue几乎同时调用。T3 _head last递增但插入其项目 (at _array[2]) 并调用_readSema.Release first,从而产生一个有信号_readSema但看起来像这样的队列:

[ 0 0 x 0 0 0 0 0 ]

因此,如果另一个线程同时调用到Dequeue(在 T1 和 T2 完成他们的事情之前),它将过去_readSema.WaitOne,递增_tail,并获得index0,即使插槽 0的。

所以有你的问题。至于解决方案,我目前没有任何建议。给我一些时间考虑一下......(我现在发布这个答案是因为它在我脑海中很新鲜,我觉得它可能会对你有所帮助。)

于 2010-10-10T02:42:23.443 回答
3

(+1 给我投票的 Dan Tao 有答案)入队会变成这样......

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
    ;

出队将更改为这样的东西......

while( (ret = Interlocked.Exchange(ref _array[index], null)) == null)
    ;

这建立在丹涛的出色分析之上。因为索引是原子获得的,所以(假设没有线程在 enqueue 或 dequeue 方法中死亡或终止)读者可以保证最终填充他的单元格,或者保证写入者最终可以释放他的单元格(null)。

于 2010-10-10T03:40:14.433 回答
2

谢谢丹涛和莱斯,

我非常感谢您的帮助。丹,你打开了我的思路:关键部分中有多少生产者/消费者并不重要,重要的是锁是按顺序释放的。莱斯,你找到了问题的解决方案。

现在是时候用我在你们俩的帮助下编写的最终代码来最终回答我自己的问题了。嗯,它不多,但它是对 Les 代码的一点增强

入队:

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
            Thread.Sleep(0);

出队:

while ((ret = Interlocked.Exchange(ref _array[index], null)) == null)
            Thread.Sleep(0);

为什么是 Thread.Sleep(0)?当我们发现无法检索/存储元素时,为什么要立即再次检查?我需要强制上下文切换以允许其他线程读/写。很明显,下一个要调度的线程可能是另一个无法操作的线程,但至少我们强制了。来源: http: //progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html

我还测试了前一个测试用例的代码来证明我的主张:

不睡觉(0)

Read 6164150 elements
Wrote 6322541 elements
Read 5885192 elements
Wrote 5785144 elements
Wrote 6439924 elements
Read 6497471 elements

与睡眠(0)

Wrote 7135907 elements
Read 6361996 elements
Wrote 6761158 elements
Read 6203202 elements
Wrote 5257581 elements
Read 6587568 elements

我知道这不是一个“伟大”的发现,我不会因为这些数字而获得图灵奖。性能增量并不显着,但大于零。强制上下文切换允许执行更多的 RW 操作(=更高的吞吐量)。

需要说明的是:在我的测试中,我只是评估队列的性能,而不是模拟生产者/消费者问题,所以不要关心在一分钟后测试结束时队列中是否还有元素。但我只是展示了我的方法,谢谢大家。

可作为 MS-RL 开源的代码:http: //logbus-ng.svn.sourceforge.net/viewvc/logbus-ng/trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs ?revision =461&view=标记

于 2010-10-12T22:28:23.203 回答