3

我正在尝试使用 ServiceStack-Redis 库提供并在此处描述的锁定机制来实现 DLM ,但我发现 API 似乎存在竞争条件,有时会向多个客户端授予相同的锁。

BasicRedisClientManager mgr = new BasicRedisClientManager(redisConnStr);

using(var client = mgr.GetClient())
{
    client.Remove("touchcount");
    client.Increment("touchcount", 0);
}

Random rng = new Random();

Action<object> simulatedDistributedClientCode = (clientId) => {

    using(var redisClient = mgr.GetClient())
    {
        using(var mylock = redisClient.AcquireLock("mutex", TimeSpan.FromSeconds(2)))
        {
            long touches = redisClient.Get<long>("touchcount");
            Debug.WriteLine("client{0}: I acquired the lock! (touched: {1}x)", clientId, touches);
            if(touches > 0) {
                Debug.WriteLine("client{0}: Oh, but I see you've already been here. I'll release it.", clientId);
                return;
            }
            int arbitraryDurationOfExecutingCode = rng.Next(100, 2500);
            Thread.Sleep(arbitraryDurationOfExecutingCode); // do some work of arbitrary duration
            redisClient.Increment("touchcount", 1);
        }
        Debug.WriteLine("client{0}: Okay, I released my lock, your turn now.", clientId);
    }
};
Action<Task> exceptionWriter = (t) => {if(t.IsFaulted) Debug.WriteLine(t.Exception.InnerExceptions.First());};

int arbitraryDelayBetweenClients = rng.Next(5, 500);
var clientWorker1 = new Task(simulatedDistributedClientCode, 1);
var clientWorker2 = new Task(simulatedDistributedClientCode, 2);

clientWorker1.Start();
Thread.Sleep(arbitraryDelayBetweenClients);
clientWorker2.Start();

Task.WaitAll(
    clientWorker1.ContinueWith(exceptionWriter),
    clientWorker2.ContinueWith(exceptionWriter)
    );

using(var client = mgr.GetClient())
{
    var finaltouch = client.Get<long>("touchcount");
    Console.WriteLine("Touched a total of {0}x.", finaltouch);
}

mgr.Dispose();

当运行上面的代码来模拟两个客户端在短时间内尝试相同的操作时,有三个可能的输出。第一个是互斥体正常工作并且客户端以正确顺序进行的最佳情况。第二种情况是第二个客户端超时等待获取锁;也是可以接受的结果。然而,问题在于,当arbitraryDurationOfExecutingCode接近或超过获取锁的超时时间时,很容易重现第二个客户端在第一个客户端释放锁之前被授予锁的情况,产生如下输出:

client1:我获得了锁!(touched: 0x)
client2: 我获得了锁!(touched: 0x)
client1: 好的,我释放了我的锁,现在轮到你了。
客户 2:好的,我释放了我的锁,现在轮到你了。
一共摸了2x。

我对 API 及其文档的理解是,timeOut获取锁时的参数就是——获取锁的超时时间。如果我必须猜测一个timeOut足够高的值以始终长于执行代码的持续时间以防止出现这种情况,那似乎很容易出错。除了传递 null 来永远等待锁之外,还有其他人可以解决吗?我绝对不想这样做,否则我知道我最终会得到坠毁工人的鬼锁。

4

2 回答 2

3

myz 的回答(感谢您的及时回复!)确认 ServiceStack.Redis 中的内置AcquireLock方法没有区分锁获取期和锁过期期。就我们的目的而言,我们现有的代码预计如果锁定被占用,分布式锁定机制会迅速失败,但允许在锁定范围内长时间运行进程。为了满足这些要求,我在 ServiceStack RedisLock 上派生了这个变体,它允许区分两者。

// based on ServiceStack.Redis.RedisLock
// https://github.com/ServiceStack/ServiceStack.Redis/blob/master/src/ServiceStack.Redis/RedisLock.cs
internal class RedisDlmLock : IDisposable
{
    public static readonly TimeSpan DefaultLockAcquisitionTimeout = TimeSpan.FromSeconds(30);
    public static readonly TimeSpan DefaultLockMaxAge = TimeSpan.FromHours(2);
    public const string LockPrefix = "";    // namespace lock keys if desired

    private readonly IRedisClient _client; // note that the held reference to client means lock scope should always be within client scope

    private readonly string _lockKey;
    private string _lockValue;

    /// <summary>
    /// Acquires a distributed lock on the specified key.
    /// </summary>
    /// <param name="redisClient">The client to use to acquire the lock.</param>
    /// <param name="key">The key to acquire the lock on.</param>
    /// <param name="acquisitionTimeOut">The amount of time to wait while trying to acquire the lock. Defaults to <see cref="DefaultLockAcquisitionTimeout"/>.</param>
    /// <param name="lockMaxAge">After this amount of time expires, the lock will be invalidated and other clients will be allowed to establish a new lock on the same key. Deafults to <see cref="DefaultLockMaxAge"/>.</param>
    public RedisDlmLock(IRedisClient redisClient, string key, TimeSpan? acquisitionTimeOut = null, TimeSpan? lockMaxAge = null)
    {
        _client = redisClient;
        _lockKey = LockPrefix + key;

        ExecExtensions.RetryUntilTrue(
            () =>
            {
                //Modified from ServiceStack.Redis.RedisLock
                //This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx
                //Calculate a unix time for when the lock should expire

                lockMaxAge = lockMaxAge ?? DefaultLockMaxAge; // hold the lock for the default amount of time if not specified.
                DateTime expireTime = DateTime.UtcNow.Add(lockMaxAge.Value);
                _lockValue = (expireTime.ToUnixTimeMs() + 1).ToString(CultureInfo.InvariantCulture);

                //Try to set the lock, if it does not exist this will succeed and the lock is obtained
                var nx = redisClient.SetEntryIfNotExists(_lockKey, _lockValue);
                if (nx)
                    return true;

                //If we've gotten here then a key for the lock is present. This could be because the lock is
                //correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly).
                //Therefore we need to get the value of the lock to see when it should expire
                string existingLockValue = redisClient.Get<string>(_lockKey);
                long lockExpireTime;
                if (!long.TryParse(existingLockValue, out lockExpireTime))
                    return false;
                //If the expire time is greater than the current time then we can't let the lock go yet
                if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs())
                    return false;

                //If the expire time is less than the current time then it wasn't released properly and we can attempt to 
                //acquire the lock. This is done by setting the lock to our timeout string AND checking to make sure
                //that what is returned is the old timeout string in order to account for a possible race condition.
                return redisClient.GetAndSetEntry(_lockKey, _lockValue) == existingLockValue;
            },
            acquisitionTimeOut ?? DefaultLockAcquisitionTimeout // loop attempting to get the lock for this amount of time.
            );
    }

    public override string ToString()
    {
        return String.Format("RedisDlmLock:{0}:{1}", _lockKey, _lockValue);
    }

    public void Dispose()
    {
        try
        {
            // only remove the entry if it still contains OUR value
            _client.Watch(_lockKey);
            var currentValue = _client.Get<string>(_lockKey);
            if (currentValue != _lockValue)
            {
                _client.UnWatch();
                return;
            }

            using (var tx = _client.CreateTransaction())
            {
                tx.QueueCommand(r => r.Remove(_lockKey));
                tx.Commit();
            }
        }
        catch (Exception ex)
        {
            // log but don't throw
        }
    }
}

为了尽可能简化使用,我还公开了一些扩展方法IRedisClient来并行该AcquireLock方法,如下所示:

internal static class RedisClientLockExtensions
{
    public static IDisposable AcquireDlmLock(this IRedisClient client, string key, TimeSpan timeOut, TimeSpan maxAge)
    {
        return new RedisDlmLock(client, key, timeOut, maxAge);
    }
}
于 2014-10-07T16:59:59.673 回答
2

您的问题突出了ServiceStack.Redis中分布式锁定的行为,如果超过指定的超时,超时客户端将其视为无效锁并尝试自动恢复锁。如果没有自动恢复行为,崩溃的客户端将永远不会释放锁,并且不会允许等待该锁的进一步操作通过。

的锁定行为封装在 RedisLock 类AcquireLock中:

public IDisposable AcquireLock(string key, TimeSpan timeOut)
{
    return new RedisLock(this, key, timeOut);
}

您可以复制并修改它以适应您喜欢的行为:

using (new MyRedisLock(client, key, timeout))
{
    //...
}
于 2014-10-02T20:21:56.240 回答