2

我有以下课程:

public class AtomicLong
{
    private long initial;
    private long value;

    public AtomicLong(long value = 0)
    {
        this.initial = value;
        this.value = value;
    }

    public class Handle : IDisposable
    {
        private AtomicLong source;
        private long amount;

        public Handle(AtomicLong source, long amount)
        {
            this.source = source;
            this.amount = amount;
        }

        public void Dispose()
        {
            if (source == null)
                return;
            Interlocked.Add(ref source.value, amount);
            source = null;
        }
    }

    public Handle Claim(long amount)
    {
        if (amount > initial)
            throw new ArgumentOutOfRangeException("amount", amount, "Must be no more than the initial amount.");
        if (amount < 0)
            throw new ArgumentOutOfRangeException("amount", amount, "Must be nonnegative.");
        while (true)
        {
            var oldValue = Interlocked.Read(ref value);
            var newValue = oldValue - amount;
            if (newValue >= 0 &&
                oldValue == Interlocked.CompareExchange(ref value, newValue, oldValue))
            {
                return new Handle(this, amount);
            }
        }
    }
}

一个示例用法是,我可以有一个AtomicLong unusedMemory代表一组工作人员当前可用的内存字节数的单个。(这并不意味着接近精确 - 这只是一个粗略的衡量标准。)然后我在一堆不同的工作线程上执行此操作:

while (true)
{
    var unitOfWork = WaitForUnitOfWork();
    long requiredMemory = unitOfWork.RequiredMemory;
    using (var handle = unusedMemory.Claim(requiredMemory))
    {
        //wait until requireMemory can be claimed from unusedMemory
        //do work with reserved memory, represented by handle
        //when handle disposes, memory is released back to the unusedMemory
    }
}

AtomicLong班的问题是呼叫Claim将忙,直到他们返回。我想通过使用某种操作系统级别的等待句柄抽象来解决这个问题。

你能建议我怎么做吗?


动机

考虑以下场景:

  • usedMemory 的初始值为 10GB ( 10 << 30)
  • 100 个工作线程
  • 10 个工作单元,每个需要 10GB 和 1 分钟来执行
  • 第一个工人打电话Claim(10 << 30),它几乎立即返回
    • 它开始做将在 1 分钟后完成的工作
  • 大约 9 名其他工作人员拨打了相同的电话Claim(10 << 30)并进行了“糟糕”的忙等待 1 分钟
    • 9 个线程在方法中执行类似while(true){/*do nothing*/}循环的操作Claim
    • 大量不必要的 CPU 使用率
  • WaitForUnitOfWork()其余的工人(90)在方法中做一个“好的”操作系统级等待

重要的一点: 只有在请求的内存实际上可以Claim申请时才“便宜” 。amount如果不是,则在可用之前一直等待。

为了完全清楚,在该Claim方法中,我指出了导致所有差异的确切表达式(newValue >= 0):

while (true)
{
    var oldValue = Interlocked.Read(ref value);
    var newValue = oldValue - amount;
    if (newValue >= 0 && // <--------------------------- THIS IS THE PROBLEM
        oldValue == Interlocked.CompareExchange(ref value, newValue, oldValue))
    {
        return new Handle(this, amount);
    }
}

问题不在于是否Interlocked.CompareExchange会很贵——我知道它很便宜。问题是当amount调用者想要的Claim当前大于amount.AtomicLong


如果你有完全不同的方法来解决这类问题,或者看到我已经拥有的一些缺陷,我也想听听!

4

2 回答 2

0

你有几个选择。

例如,您可以通过让活动线程在给定的时间间隔内休眠来创建更智能的忙等待,因此它并不总是检查您的状况,而是定期检查。

另一种解决方案是创建一个自定义事件并在您的活动线程中等待该事件,您可以定义一个自定义事件来完成我相信的任务。

您可以在此处阅读有关事件的更多信息。您可以在此处阅读有关自定义事件创建的信息。

于 2013-05-21T22:53:18.680 回答
0

这是我想出的解决方案:

参数

  • obj: 使用的同步对象Monitor
  • pollIterval:调用将轮询的速率,直到成功的事务尝试
  • value: 被事务修改的值
  • precondition: 在事务开始时必须为真的可选条件
  • transform: 改变值的操作
  • postcondition: 在事务结束时必须为真的可选条件

代码

public static class AtomicHelper
{
    public static void LongTransaction(
        object obj,
        TimeSpan pollInterval,
        ref long value,
        Func<long, bool> precondition,
        Func<long, long> transform,
        Func<long, bool> postcondition)
    {
        while (true)
        {
            var oldValue = Interlocked.Read(ref value);
            if (precondition != null && !precondition(oldValue))
            {
                Monitor.Wait(obj, pollInterval);
                continue;
            }
            var newValue = transform(oldValue);
            if (postcondition != null && !postcondition(newValue))
            {
                Monitor.Wait(obj, pollInterval);
                continue;
            }
            if (Interlocked.CompareExchange(ref value, newValue, oldValue) == oldValue)
            {
                Monitor.PulseAll(obj);
                return;
            }
        }
    }
}

示例用法

long n = 10;
object obj = new object();
//On many different threads, run this concurrently:
AtomicHelper.LongTransaction(
    obj,
    TimeSpan.FromSeconds(1),
    ref n,
    null,
    x => x - 1,
    x => x >= 0);
Thread.Sleep(TimeSpan.FromSeconds(3));
AtomicHelper.LongTransaction(
    obj,
    TimeSpan.Zero,
    ref n,
    null,
    x => x + 1,
    null);
于 2013-05-24T21:50:51.653 回答