34

我试图找出我的 ImageProcessor 库在此处向缓存添加项目时出现间歇性文件访问错误的问题。

System.IO.IOException:该进程无法访问文件“D:\home\site\wwwroot\app_data\cache\0\6\5\f\2\7\065f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp”,因为它正被另一个进程使用.

我编写了一个类,旨在根据散列 url 生成的密钥执行异步锁定,但似乎我在实现中遗漏了一些东西。

我的锁定课

public sealed class AsyncDuplicateLock
{
    /// <summary>
    /// The collection of semaphore slims.
    /// </summary>
    private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
                            = new ConcurrentDictionary<object, SemaphoreSlim>();

    /// <summary>
    /// Locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public IDisposable Lock(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
        semaphore.Wait();
        return releaser;
    }

    /// <summary>
    /// Asynchronously locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public Task<IDisposable> LockAsync(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));

        Task waitTask = semaphore.WaitAsync();

        return waitTask.IsCompleted
                   ? releaserTask
                   : waitTask.ContinueWith(
                       (_, r) => (IDisposable)r,
                       releaser,
                       CancellationToken.None,
                       TaskContinuationOptions.ExecuteSynchronously,
                       TaskScheduler.Default);
    }

    /// <summary>
    /// The disposable scope.
    /// </summary>
    private sealed class DisposableScope : IDisposable
    {
        /// <summary>
        /// The key
        /// </summary>
        private readonly object key;

        /// <summary>
        /// The close scope action.
        /// </summary>
        private readonly Action<object> closeScopeAction;

        /// <summary>
        /// Initializes a new instance of the <see cref="DisposableScope"/> class.
        /// </summary>
        /// <param name="key">
        /// The key.
        /// </param>
        /// <param name="closeScopeAction">
        /// The close scope action.
        /// </param>
        public DisposableScope(object key, Action<object> closeScopeAction)
        {
            this.key = key;
            this.closeScopeAction = closeScopeAction;
        }

        /// <summary>
        /// Disposes the scope.
        /// </summary>
        public void Dispose()
        {
            this.closeScopeAction(this.key);
        }
    }
}

用法 - 在 HttpModule 中

private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();

using (await this.locker.LockAsync(cachedPath))
{
    // Process and save a cached image.
}

谁能发现我哪里出错了?我担心我误解了一些基本的东西。

该库的完整源代码存储在此处的 Github 上

4

5 回答 5

62

正如其他回答者指出的那样,原始代码在释放信号量之前SemaphoreSlim从中删除 。ConcurrentDictionary所以,你有太多的信号量搅动 - 当它们仍然可以使用时,它们被从字典中删除(没有被获取,但已经从字典中检索)。

这种“映射锁”的问题是很难知道何时不再需要信号量。一种选择是根本不释放信号量;这是一个简单的解决方案,但在您的场景中可能不可接受。另一种选择 - 如果信号量实际上与对象实例相关而不是值(如字符串) - 是使用 ephemerons 附加它们;但是,我相信这个选项在您的场景中也是不可接受的。

所以,我们很难做到。:)

有几种不同的方法可行。我认为从引用计数的角度来处理它是有意义的(引用计数字典中的每个信号量)。另外,我们想让递减计数和删除操作原子化,所以我只使用一个lock(使并发字典变得多余):

public sealed class AsyncDuplicateLock
{
  private sealed class RefCounted<T>
  {
    public RefCounted(T value)
    {
      RefCount = 1;
      Value = value;
    }

    public int RefCount { get; set; }
    public T Value { get; private set; }
  }

  private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
                        = new Dictionary<object, RefCounted<SemaphoreSlim>>();

  private SemaphoreSlim GetOrCreate(object key)
  {
    RefCounted<SemaphoreSlim> item;
    lock (SemaphoreSlims)
    {
      if (SemaphoreSlims.TryGetValue(key, out item))
      {
        ++item.RefCount;
      }
      else
      {
        item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
        SemaphoreSlims[key] = item;
      }
    }
    return item.Value;
  }

  public IDisposable Lock(object key)
  {
    GetOrCreate(key).Wait();
    return new Releaser { Key = key };
  }

  public async Task<IDisposable> LockAsync(object key)
  {
    await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
    return new Releaser { Key = key };
  }

  private sealed class Releaser : IDisposable
  {
    public object Key { get; set; }

    public void Dispose()
    {
      RefCounted<SemaphoreSlim> item;
      lock (SemaphoreSlims)
      {
        item = SemaphoreSlims[Key];
        --item.RefCount;
        if (item.RefCount == 0)
          SemaphoreSlims.Remove(Key);
      }
      item.Value.Release();
    }
  }
}
于 2015-07-02T21:06:18.103 回答
1

对于给定的键,

  1. 线程 1 调用GetOrAdd并添加一个新信号量并通过以下方式获取它Wait
  2. 线程 2 调用GetOrAdd并获取现有信号量并阻塞Wait
  3. 线程 1 仅在调用 后才释放信号量,TryRemove这从字典中删除了信号量
  4. 线程 2 现在获取信号量。
  5. 线程 3 调用GetOrAdd与线程 1 和 2 相同的键。线程 2 仍然持有信号量,但信号量不在字典中,因此线程 3 创建一个新信号量,线程 2 和 3 都访问相同的受保护资源。

你需要调整你的逻辑。只有在没有服务员的情况下,才应将信号量从字典中删除。

这是一种潜在的解决方案,减去异步部分:

public sealed class AsyncDuplicateLock
{
    private class LockInfo
    {
        private SemaphoreSlim sem;
        private int waiterCount;

        public LockInfo()
        {
            sem = null;
            waiterCount = 1;
        }

        // Lazily create the semaphore
        private SemaphoreSlim Semaphore
        {
            get
            {
                var s = sem;
                if (s == null)
                {
                    s = new SemaphoreSlim(0, 1);
                    var original = Interlocked.CompareExchange(ref sem, null, s);
                    // If someone else already created a semaphore, return that one
                    if (original != null)
                        return original;
                }
                return s;
            }
        }

        // Returns true if successful
        public bool Enter()
        {
            if (Interlocked.Increment(ref waiterCount) > 1)
            {
                Semaphore.Wait();
                return true;
            }
            return false;
        }

        // Returns true if this lock info is now ready for removal
        public bool Exit()
        {
            if (Interlocked.Decrement(ref waiterCount) <= 0)
                return true;

            // There was another waiter
            Semaphore.Release();
            return false;
        }
    }

    private static readonly ConcurrentDictionary<object, LockInfo> activeLocks = new ConcurrentDictionary<object, LockInfo>();

    public static IDisposable Lock(object key)
    {
        // Get the current info or create a new one
        var info = activeLocks.AddOrUpdate(key,
          (k) => new LockInfo(),
          (k, v) => v.Enter() ? v : new LockInfo());

        DisposableScope releaser = new DisposableScope(() =>
        {
            if (info.Exit())
            {
                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                ((ICollection<KeyValuePair<object, LockInfo>>)activeLocks)
                  .Remove(new KeyValuePair<object, LockInfo>(key, info));
            }
        });

        return releaser;
    }

    private sealed class DisposableScope : IDisposable
    {
        private readonly Action closeScopeAction;

        public DisposableScope(Action closeScopeAction)
        {
            this.closeScopeAction = closeScopeAction;
        }

        public void Dispose()
        {
            this.closeScopeAction();
        }
    }
}
于 2015-06-30T12:29:09.813 回答
1

这是一个KeyedLock不太方便且更容易出错的课程,但也比 Stephen Cleary 的课程分配更少AsyncDuplicateLock。它在内部维护一个 s 池,SemaphoreSlim在它们被前一个键释放后,可以被任何键重用。池的容量是可配置的,默认为 10。

这个类不是免分配的,因为SemaphoreSlim每次由于争用而无法同步获取信号量时,该类都会分配内存(实际上很多)。

锁可以同步和异步请求,也可以取消和超时请求。这些特性是通过利用SemaphoreSlim类的现有功能来提供的。

public class KeyedLock<TKey>
{
    private readonly Dictionary<TKey, (SemaphoreSlim, int)> _perKey;
    private readonly Stack<SemaphoreSlim> _pool;
    private readonly int _poolCapacity;

    public KeyedLock(IEqualityComparer<TKey> keyComparer = null, int poolCapacity = 10)
    {
        _perKey = new Dictionary<TKey, (SemaphoreSlim, int)>(keyComparer);
        _pool = new Stack<SemaphoreSlim>(poolCapacity);
        _poolCapacity = poolCapacity;
    }

    public async Task<bool> WaitAsync(TKey key, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        var semaphore = GetSemaphore(key);
        bool entered = false;
        try
        {
            entered = await semaphore.WaitAsync(millisecondsTimeout,
                cancellationToken).ConfigureAwait(false);
        }
        finally { if (!entered) ReleaseSemaphore(key, entered: false); }
        return entered;
    }

    public Task WaitAsync(TKey key, CancellationToken cancellationToken = default)
        => WaitAsync(key, Timeout.Infinite, cancellationToken);

    public bool Wait(TKey key, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        var semaphore = GetSemaphore(key);
        bool entered = false;
        try { entered = semaphore.Wait(millisecondsTimeout, cancellationToken); }
        finally { if (!entered) ReleaseSemaphore(key, entered: false); }
        return entered;
    }

    public void Wait(TKey key, CancellationToken cancellationToken = default)
        => Wait(key, Timeout.Infinite, cancellationToken);

    public void Release(TKey key) => ReleaseSemaphore(key, entered: true);

    private SemaphoreSlim GetSemaphore(TKey key)
    {
        SemaphoreSlim semaphore;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                int counter;
                (semaphore, counter) = entry;
                _perKey[key] = (semaphore, ++counter);
            }
            else
            {
                lock (_pool) semaphore = _pool.Count > 0 ? _pool.Pop() : null;
                if (semaphore == null) semaphore = new SemaphoreSlim(1, 1);
                _perKey[key] = (semaphore, 1);
            }
        }
        return semaphore;
    }

    private void ReleaseSemaphore(TKey key, bool entered)
    {
        SemaphoreSlim semaphore; int counter;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                (semaphore, counter) = entry;
                counter--;
                if (counter == 0)
                    _perKey.Remove(key);
                else
                    _perKey[key] = (semaphore, counter);
            }
            else
            {
                throw new InvalidOperationException("Key not found.");
            }
        }
        if (entered) semaphore.Release();
        if (counter == 0)
        {
            Debug.Assert(semaphore.CurrentCount == 1);
            lock (_pool) if (_pool.Count < _poolCapacity) _pool.Push(semaphore);
        }
    }
}

使用示例:

var locker = new KeyedLock<string>();

await locker.WaitAsync("Hello");
try
{
    await DoSomethingAsync();
}
finally
{
    locker.Release("Hello");
}

该实现使用元组解构,至少需要 C# 7。

该类KeyedLock可以很容易地修改为KeyedSemaphore,这将允许每个键多个并发操作。它只需要maximumConcurrencyPerKey构造函数中的一个参数,该参数将被存储并传递给SemaphoreSlims 的构造函数。


注意:该类SemaphoreSlim在被滥用时会抛出一个SemaphoreFullException. 当信号量被释放的次数多于被获取的次数时,就会发生这种情况。这个答案的KeyedLock实现在滥用的情况下表现不同:它抛出一个InvalidOperationException("Key not found."). 发生这种情况是因为当一个键被释放的次数与它被获取的次数一样多时,相关的信号量就会从字典中删除。如果这个实现曾经抛出一个SemaphoreFullException,这将是一个错误的指示。

于 2020-12-11T17:43:58.987 回答
0

我用这个重写了@StephenCleary 的答案:

public sealed class AsyncLockList {

    readonly Dictionary<object, SemaphoreReferenceCount> Semaphores = new Dictionary<object, SemaphoreReferenceCount>();

    SemaphoreSlim GetOrCreateSemaphore(object key) {
        lock (Semaphores) {
            if (Semaphores.TryGetValue(key, out var item)) {
                item.IncrementCount();
            } else {
                item = new SemaphoreReferenceCount();
                Semaphores[key] = item;
            }
            return item.Semaphore;
        }
    }

    public IDisposable Lock(object key) {
        GetOrCreateSemaphore(key).Wait();
        return new Releaser(Semaphores, key);
    }

    public async Task<IDisposable> LockAsync(object key) {
        await GetOrCreateSemaphore(key).WaitAsync().ConfigureAwait(false);
        return new Releaser(Semaphores, key);
    }

    sealed class SemaphoreReferenceCount {
        public readonly SemaphoreSlim Semaphore = new SemaphoreSlim(1, 1);
        public int Count { get; private set; } = 1;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementCount() => Count++;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void DecrementCount() => Count--;
    }

    sealed class Releaser : IDisposable {
        readonly Dictionary<object, SemaphoreReferenceCount> Semaphores;
        readonly object Key;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public Releaser(Dictionary<object, SemaphoreReferenceCount> semaphores, object key) {
            Semaphores = semaphores;
            Key = key;
        }

        public void Dispose() {
            lock (Semaphores) {
                var item = Semaphores[Key];
                item.DecrementCount();
                if (item.Count == 0)
                    Semaphores.Remove(Key);
                item.Semaphore.Release();
            }
        }
    }
}
于 2018-08-28T05:45:44.650 回答
0

受此先前答案的启发,这是一个支持异步等待的版本:

    public class KeyedLock<TKey>
    {
        private readonly ConcurrentDictionary<TKey, LockInfo> _locks = new();

        public int Count => _locks.Count;

        public async Task<IDisposable> WaitAsync(TKey key, CancellationToken cancellationToken = default)
        {
            // Get the current info or create a new one.
            var info = _locks.AddOrUpdate(key,
                // Add
                k => new LockInfo(),
                // Update
                (k, v) => v.Enter() ? v : new LockInfo());

            try
            {
                await info.Semaphore.WaitAsync(cancellationToken);

                return new Releaser(() => Release(key, info, true));
            }
            catch (OperationCanceledException)
            {
                // The semaphore wait was cancelled, release the lock.
                Release(key, info, false);
                throw;
            }
        }

        private void Release(TKey key, LockInfo info, bool isCurrentlyLocked)
        {
            if (info.Leave())
            {
                // This was the last lock for the key.

                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                // Note that this call to Remove(entry) is in fact thread safe.
                var entry = new KeyValuePair<TKey, LockInfo>(key, info);
                if (((ICollection<KeyValuePair<TKey, LockInfo>>)_locks).Remove(entry))
                {
                    // This exact info was removed.
                    info.Dispose();
                }
            }
            else if (isCurrentlyLocked)
            {
                // There is another waiter.
                info.Semaphore.Release();
            }
        }

        private class LockInfo : IDisposable
        {
            private SemaphoreSlim _semaphore = null;
            private int _refCount = 1;

            public SemaphoreSlim Semaphore
            {
                get
                {
                    // Lazily create the semaphore.
                    var s = _semaphore;
                    if (s is null)
                    {
                        s = new SemaphoreSlim(1, 1);

                        // Assign _semaphore if its current value is null.
                        var original = Interlocked.CompareExchange(ref _semaphore, s, null);

                        // If someone else already created a semaphore, return that one
                        if (original is not null)
                        {
                            s.Dispose();
                            return original;
                        }
                    }
                    return s;
                }
            }

            // Returns true if successful
            public bool Enter()
            {
                if (Interlocked.Increment(ref _refCount) > 1)
                {
                    return true;
                }

                // This lock info is not valid anymore - its semaphore is or will be disposed.
                return false;
            }

            // Returns true if this lock info is now ready for removal
            public bool Leave()
            {
                if (Interlocked.Decrement(ref _refCount) <= 0)
                {
                    // This was the last lock
                    return true;
                }

                // There is another waiter
                return false;
            }

            public void Dispose() => _semaphore?.Dispose();
        }

        private sealed class Releaser : IDisposable
        {
            private readonly Action _dispose;

            public Releaser(Action dispose) => _dispose = dispose;

            public void Dispose() => _dispose();
        }
    }
于 2022-01-06T22:06:37.087 回答