192

我有以下课程。

class Test{
    public HashSet<string> Data = new HashSet<string>();
}

我需要从不同的线程更改字段“数据”,所以我想对我当前的线程安全实现提出一些意见。

class Test{
    public HashSet<string> Data = new HashSet<string>();

    public void Add(string Val){
            lock(Data) Data.Add(Val);
    }

    public void Remove(string Val){
            lock(Data) Data.Remove(Val);
    }
}

有没有更好的解决方案,直接进入现场并保护它免受多个线程的并发访问?

4

6 回答 6

213

你的实现是正确的。不幸的是,.NET Framework 不提供内置的并发哈希集类型。但是,有一些解决方法。

并发字典(推荐)

这第一个是使用ConcurrentDictionary<TKey, TValue>命名空间中的类System.Collections.Concurrent。在这种情况下,该值是没有意义的,所以我们可以使用一个简单的byte(内存中的 1 个字节)。

private ConcurrentDictionary<string, byte> _data;

这是推荐的选项,因为该类型是线程安全的,并且为您提供与HashSet<T>除了键和值是不同对象之外的相同的优点。

资料来源:社交 MSDN

并发包

如果您不介意重复的条目,您可以ConcurrentBag<T>在与前一个类相同的命名空间中使用该类。

private ConcurrentBag<string> _data;

自我实现

最后,正如您所做的那样,您可以使用锁或 .NET 为您提供的其他线程安全方式来实现自己的数据类型。这是一个很好的例子:How to implement ConcurrentHashSet in .Net

此解决方案的唯一缺点是该类型HashSet<T>没有正式的并发访问,即使对于读取操作也是如此。

我引用了链接帖子的代码(最初由Ben Mosher编写)。

using System;
using System.Collections.Generic;
using System.Threading;

namespace BlahBlah.Utilities
{
    public class ConcurrentHashSet<T> : IDisposable
    {
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        private readonly HashSet<T> _hashSet = new HashSet<T>();

        #region Implementation of ICollection<T> ...ish
        public bool Add(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Add(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public void Clear()
        {
            _lock.EnterWriteLock();
            try
            {
                _hashSet.Clear();
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public bool Contains(T item)
        {
            _lock.EnterReadLock();
            try
            {
                return _hashSet.Contains(item);
            }
            finally
            {
                if (_lock.IsReadLockHeld) _lock.ExitReadLock();
            }
        }

        public bool Remove(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Remove(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public int Count
        {
            get
            {
                _lock.EnterReadLock();
                try
                {
                    return _hashSet.Count;
                }
                finally
                {
                    if (_lock.IsReadLockHeld) _lock.ExitReadLock();
                }
            }
        }
        #endregion

        #region Dispose
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
                if (_lock != null)
                    _lock.Dispose();
        }
        ~ConcurrentHashSet()
        {
            Dispose(false);
        }
        #endregion
    }
}

编辑:将入口锁定方法移到try块之外,因为它们可能引发异常并执行finally块中包含的指令。

于 2013-09-20T18:04:17.087 回答
51

我没有包装 aConcurrentDictionary或锁定 a,HashSet而是创建了一个ConcurrentHashSet基于ConcurrentDictionary.

此实现支持每个项目HashSet的基本操作,而无需设置操作,因为它们在并发场景 IMO 中意义不大:

var concurrentHashSet = new ConcurrentHashSet<string>(
    new[]
    {
        "hamster",
        "HAMster",
        "bar",
    },
    StringComparer.OrdinalIgnoreCase);

concurrentHashSet.TryRemove("foo");

if (concurrentHashSet.Contains("BAR"))
{
    Console.WriteLine(concurrentHashSet.Count);
}

输出:2

您可以从这里的 NuGet 获取它,并此处查看 GitHub 上的源代码。

于 2017-02-10T17:43:17.757 回答
27

由于没有其他人提到它,我将提供一种替代方法,可能适合也可能不适合您的特定目的:

Microsoft 不可变集合

来自背后的 MS 团队的博客文章

虽然并发创建和运行比以往任何时候都容易,但仍然存在一个基本问题:可变共享状态。从多个线程读取通常非常容易,但是一旦需要更新状态,它就会变得更加困难,尤其是在需要锁定的设计中。

锁定的替代方法是使用不可变状态。不可变的数据结构保证永远不会改变,因此可以在不同的线程之间自由传递,而不必担心踩到别人的脚趾。

但是这种设计产生了一个新问题:如何在不每次都复制整个状态的情况下管理状态变化?当涉及到集合时,这尤其棘手。

这就是不可变集合的用武之地。

这些集合包括ImmutableHashSet<T>ImmutableList<T>

表现

由于不可变集合在底层使用树数据结构来实现结构共享,因此它们的性能特征与可变集合不同。与锁定可变集合相比,结果将取决于锁定争用和访问模式。然而,取自另一篇关于不可变集合的博文:

问:我听说不可变集合很慢。这些有什么不同吗?当性能或内存很重要时,我可以使用它们吗?

答:这些不可变集合经过高度调整,在平衡内存共享的同时具有与可变集合竞争的性能特征。在某些情况下,它们在算法上和实际时间上几乎与可变集合一样快,有时甚至更快,而在其他情况下,它们在算法上更复杂。然而,在许多情况下,差异可以忽略不计。通常,您应该使用最简单的代码来完成工作,然后根据需要调整性能。不可变集合可帮助您编写简单的代码,尤其是在必须考虑线程安全时。

换句话说,在许多情况下,差异不会很明显,您应该选择更简单的选择 - 对于并发集,使用ImmutableHashSet<T>,因为您没有现有的锁定可变实现!:-)

于 2016-04-06T10:05:45.530 回答
4

并发的棘手部分ISet<T>是集合方法(联合、交集、差异)本质上是迭代的。至少您必须遍历操作中涉及的一组集合的所有 n 个成员,同时锁定两个集合。

ConcurrentDictionary<T,byte>当您必须在迭代期间锁定整个集合时,您将失去 a 的优势。如果没有锁定,这些操作就不是线程安全的。

考虑到 的额外开销ConcurrentDictionary<T,byte>,使用更轻的重量HashSet<T>并将所有东西都包围在锁中可能更明智。

如果您不需要 set 操作,请在添加键时使用ConcurrentDictionary<T,byte>and just usedefault(byte)作为值。

于 2015-04-14T01:14:59.893 回答
2

我更喜欢完整的解决方案,所以我这样做了:请注意,我的 Count 是以不同的方式实现的,因为我不明白为什么在尝试计算其值时应该禁止读取哈希集。

@Zen,感谢您开始使用它。

[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
    private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

    private readonly HashSet<T> _hashSet = new HashSet<T>();

    public ConcurrentHashSet()
    {
    }

    public ConcurrentHashSet(IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(comparer);
    }

    public ConcurrentHashSet(IEnumerable<T> collection)
    {
        _hashSet = new HashSet<T>(collection);
    }

    public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(collection, comparer);
    }

    protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
    {
        _hashSet = new HashSet<T>();

        // not sure about this one really...
        var iSerializable = _hashSet as ISerializable;
        iSerializable.GetObjectData(info, context);
    }

    #region Dispose

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposing)
            if (_lock != null)
                _lock.Dispose();
    }

    public IEnumerator<T> GetEnumerator()
    {
        return _hashSet.GetEnumerator();
    }

    ~ConcurrentHashSet()
    {
        Dispose(false);
    }

    public void OnDeserialization(object sender)
    {
        _hashSet.OnDeserialization(sender);
    }

    public void GetObjectData(SerializationInfo info, StreamingContext context)
    {
        _hashSet.GetObjectData(info, context);
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    #endregion

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Add(item);
        }
        finally
        {
            if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void UnionWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.UnionWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void IntersectWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.IntersectWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void ExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.ExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void SymmetricExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.SymmetricExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Overlaps(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Overlaps(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool SetEquals(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.SetEquals(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    bool ISet<T>.Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Add(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Clear();
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Contains(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Contains(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.CopyTo(array, arrayIndex);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Remove(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Remove(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public int Count
    {
        get
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Count;
            }
            finally
            {
                if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }

        }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }
}
于 2014-10-22T17:39:51.720 回答
-1

我发现,在需要良好性能的“高吞吐量”场景中,简单地锁定 System.Collections.Generic.HashSet 的 add 和 remove 方法,或者包装框架的 ConcurrentDictionary 都不够。

使用这个简单的想法已经可以实现相当好的性能:

public class ExampleHashSet<T>
{
    const int ConcurrencyLevel = 124;
    const int Lower31BitMask = 0x7FFFFFFF;
    
    HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
    IEqualityComparer<T> comparer;

    public ExampleHashSet()
    {
        comparer = EqualityComparer<T>.Default;

        for(int i = 0; i < ConcurrencyLevel; i++)
            sets[i] = new HashSet<T>();
    }

    public bool Add(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Add(item);
        }
    }

    public bool Remove(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Remove(item);
        }
    }

    // further methods ...
}

系统的 HashSet 被包装,但与其他答案不同,我们持有多个 HashSet 的锁。不同的线程能够在不同的 HashSet 上“工作”,从而降低整体等待时间。

这个想法可以直接在 HashSet 本身中推广和实现(在桶上持有锁,而不是锁定完整集)。一个例子可以在这里找到。

于 2021-09-13T16:35:41.573 回答