1

由于多线程上下文中的某些速率限制,我需要在 N 个不同连接之间循环一些调用。我决定使用一个列表和一个“计数器”来实现这个功能,它应该在每次调用的实例之间“跳一个”。

我将用一个最小的例子来说明这个概念(使用一个名为 A 的类来代表连接)

class A
{
    public A()
    {
        var newIndex = Interlocked.Increment(ref index);
        ID = newIndex.ToString();
    }
    private static int index;
    public string ID;
}

static int crt = 0;
static List<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static int itemsCount = Items.Count;

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

static void Test()
{
    var sw = Stopwatch.StartNew();

    var tasks = Enumerable.Range(1, 1000000).Select(i => Task.Run(GetInstance)).ToArray();
    Task.WaitAll(tasks);
}

这按预期工作,因为它确保调用在连接之间是循环的。我可能会在“真实”代码中坚持这个实现(计数器使用 long 而不是 int)

但是,即使在我的用例中不太可能达到 int.MaxValue ,我想知道是否有办法“安全地溢出”计数器。

我知道 C# 中的“%”是“余数”而不是“模数”,这意味着一些 ?: 体操需要始终返回正数,这是我想避免的。

所以我想提出的是:

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    Interlocked.CompareExchange(ref crt, 0, itemsCount); //?? the return value is the original value, how to know if it succeeded
    var instance = Items[newIndex];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

我期望的是Interlocked.CompareExchange(ref crt, 0, itemsCount)只有一个线程“赢得”,一旦达到可用连接数,将计数器设置回 0。但是,我不知道如何在这种情况下使用它。

可以在这里使用 CompareExchange 或 Interlocked 中的其他机制吗?

4

3 回答 3

3

你可能可以:

static int crt = -1;
static readonly IReadOnlyList<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static readonly int itemsCount = Items.Count;
static readonly int maxItemCount = itemsCount * 100;

static A GetInstance()
{
    int newIndex;

    while (true)
    {
        newIndex = Interlocked.Increment(ref crt);

        if (newIndex >= itemsCount)
        {
            while (newIndex >= itemsCount && Interlocked.CompareExchange(ref crt, -1, newIndex) != newIndex)
            {
                // There is an implicit memory barrier caused by the Interlockd.CompareExchange around the
                // next line
                // See for example https://afana.me/archive/2015/07/10/memory-barriers-in-dot-net.aspx/
                // A full memory barrier is the strongest and interesting one. At least all of the following generate a full memory barrier implicitly:
                // Interlocked class mehods
                newIndex = crt;
            }

            continue;
        }

        break;
    }

    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

但是我不得不说实话……我不确定它是否正确(应该是),并且很难解释它,如果任何人以任何方式触摸它都会破坏它。

基本思想是为crt(我们不想溢出,它会破坏一切......所以我们想让 veeeeeery 远离int.MaxValue,或者你可以使用uint)有一个“低”的上限。

最大可能值为:

maxItemCount = (int.MaxValue - MaximumNumberOfThreads) / itemsCount * itemsCount;

/ itemsCount * itemsCount是因为我们希望轮次平均分配。在我给出的示例中,我使用了一个可能低得多的数字itemsCount * 100(线程。如果它们是仅使用 cpu 的非常小的线程,则重置很慢,但如果不是,则不是)。

然后当我们溢出这个天花板时,我们试图将它移回-1(我们的起点)。我们知道,同时其他糟糕的坏线程可能Interlocked.Increment会在此重置时产生竞争。由于Interlocked.CompareExchange只有一个线程可以成功重置计数器,但其他赛车线程会立即看到这一点并中断他们的尝试。

嗯...if可以重写为:

if (newIndex >= itemsCount)
{
    int newIndex2;
    while (newIndex >= itemsCount && (newIndex2 = Interlocked.CompareExchange(ref crt, 0, newIndex)) != newIndex)
    {
        // If the Interlocked.CompareExchange is successfull, the while will end and so we won't be here,
        // if it fails, newIndex2 is the current value of crt
        newIndex = newIndex2;
    }

    continue;
}
于 2021-02-04T17:42:59.590 回答
1

不,Interlocked该类没有提供任何机制来允许您将Int32值恢复为零以防溢出。原因是两个线程可能同时调用该var newIndex = Interlocked.Increment(ref crt);语句,在这种情况下,两个线程都会溢出计数器,然后没有一个线程会成功将值更新回零。这个功能只是超出了Interlocked类的能力。要使此类复杂操作原子化,您需要使用其他一些同步机制,例如lock.


更新: xanatos 的回答证明上述说法是错误的。这个9 年前的问题的答案也证明了它是错误的。下面是一个InterlockedIncrementRoundRobin方法的两个实现。第一个是这个答案的简化版本,作者 Alex Sorokoletov:

public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
    // Arguments validation omitted (the modulo should be a positive number)
    uint current = unchecked((uint)Interlocked.Increment(ref location));
    return (int)(current % modulo);
}

这种实现非常有效,但它的缺点是支持int值不能直接使用,因为它循环遍历Int32类型的整个范围(包括负值)。可用信息来自方法本身的返回值,它保证在范围内[0..modulo]。如果您想读取当前值而不增加它,您将需要另一个类似的方法来执行相同的int -> uint -> int转换:

public static int InterlockedRoundRobinRead(ref int location, int modulo)
{
    uint current = unchecked((uint)Volatile.Read(ref location));
    return (int)(current % modulo);
}

它还有一个缺点,即每 4,294,967,296 增量一次,除非modulo是 2 的幂,否则它会0在达到该值之前过早返回一个modulo - 1值。换句话说,翻转逻辑在技术上存在缺陷。这可能是一个大问题,也可能不是一个大问题,具体取决于应用程序。

第二种实现是 xanatos算法的修改版本:

public static int InterlockedRoundRobinIncrement(ref int location, int modulo)
{
    // Arguments validation omitted (the modulo should be a positive number)
    while (true)
    {
        int current = Interlocked.Increment(ref location);
        if (current >= 0 && current < modulo) return current;

        // Overflow. Try to zero the number.
        while (true)
        {
            int current2 = Interlocked.CompareExchange(ref location, 0, current);
            if (current2 == current) return 0; // Success
            current = current2;
            if (current >= 0 && current < modulo)
            {
                break; // Another thread zeroed the number. Retry increment.
            }
        }
    }
}

这效率稍低(尤其是对于小modulo值),因为有时Interlocked.Increment操作会导致超出范围的值,并且该值被拒绝并重复操作。尽管在此方法的某些调用期间,除了一些非常短暂的时间跨度外,它确实具有支持int值保持在该范围内的优势。[0..modulo]

于 2021-02-04T14:31:41.333 回答
0

使用 CompareExchange 的替代方法是简单地让值溢出。

我已经对此进行了测试并且无法证明它是错误的(到目前为止),但这并不意味着它不是。

 //this incurs some cost, but "should" ensure that the int range
 // is mapped to the unit range (int.MinValue is mapped to 0 in the uint range)
 static ulong toPositive(int i) => (uint)1 + long.MaxValue + (uint)i;

 static A GetInstance()
 {
    //this seems to overflow safely without unchecked
    var newCounter = Interlocked.Increment(ref crt);
    //convert the counter to a list index, that is map the unsigned value
    //to a signed range and get the value modulus the itemCount value
    var newIndex = (int)(toPositive(newCounter) % (ulong)itemsCount);
    var instance = Items[newIndex];
    //Console.WriteLine($"{DateTime.Now.Ticks}, Got instance: {instance.ID}");
    return instance;
 }

PS:我的问题的另一部分 xy 问题部分:在朋友的建议下,我目前正在调查使用 LinkedList 或类似的东西(带锁)来实现相同的目的。

于 2021-02-04T22:12:56.197 回答