5

本文的底部描述了使用 GetOrAdd 可能如何导致(如果我理解正确的话)损坏/意外结果。

剪辑/

ConcurrentDictionary 专为多线程场景而设计。您不必在代码中使用锁来添加或删除集合中的项目。但是,一个线程总是可以检索一个值,而另一个线程通过给同一个键一个新值来立即更新集合。

此外,虽然 ConcurrentDictionary 的所有方法都是线程安全的,但并非所有方法都是原子的,特别是 GetOrAdd 和 AddOrUpdate。传递给这些方法的用户委托是在字典的内部锁之外调用的。(这样做是为了防止未知代码阻塞所有线程。)因此,可能会发生以下事件序列:

1) threadA 调用 GetOrAdd,没有找到任何项目并通过调用 valueFactory 委托创建一个新项目以添加。

2)threadB同时调用GetOrAdd,调用了它的valueFactory委托,在threadA之前到达了内部锁,因此它的新键值对被添加到字典中。

3)threadA的用户委托完成,线程到达锁,但现在看到item已经存在

4)threadA执行“Get”,并返回threadB之前添加的数据。

因此,不能保证 GetOrAdd 返回的数据与线程的 valueFactory 创建的数据相同。调用 AddOrUpdate 时会发生类似的事件序列。

问题

验证数据并重试更新的正确方法是什么?一个不错的方法是根据旧值的内容尝试/重试此操作的扩展方法。

这将如何实施?我可以依赖结果 ( verify) 作为有效的最终状态,还是必须重试并使用不同的方法重新检索这些值?

代码

以下代码在更新值时存在竞争条件。期望的行为是 AddOrUpdateWithoutRetrieving() 将以不同的方式增加各种值(使用++or Interlocked.Increment())。

我还想在一个单元中执行多个字段操作,如果之前的更新由于竞争条件而没有“采取”,则重试更新。

运行代码,您将看到控制台中出现的每个值开始增加 1,但每个值都会漂移,并且有些值会在前面/后面进行几次迭代。

namespace DictionaryHowTo
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    // The type of the Value to store in the dictionary:
    class FilterConcurrentDuplicate
    {
        // Create a new concurrent dictionary.
        readonly ConcurrentDictionary<int, TestData> eventLogCache = 
             new ConcurrentDictionary<int, TestData>();

        static void Main()
        {
            FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();

            c.DoRace(null);
        }

        readonly ConcurrentDictionary<int, TestData> concurrentCache = 
            new ConcurrentDictionary<int, TestData>();
        void DoRace(string[] args)
        {
            int max = 1000;

            // Add some key/value pairs from multiple threads.
            Task[] tasks = new Task[3];

            tasks[0] = Task.Factory.StartNew(() =>
            {

                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 500);

                Thread.Sleep(MyRandomNumber);
                AddOrUpdateWithoutRetrieving();

            });

            tasks[1] = Task.Factory.StartNew(() =>
            {
                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 1000);

                Thread.Sleep(MyRandomNumber);

                AddOrUpdateWithoutRetrieving();

            });

            tasks[2] = Task.Factory.StartNew(() =>
            {
                AddOrUpdateWithoutRetrieving();

            });
            // Output results so far.
            Task.WaitAll(tasks);

            AddOrUpdateWithoutRetrieving();

            Console.WriteLine("Press any key.");
            Console.ReadKey();
        }
        public class TestData : IEqualityComparer<TestData>
        {
            public string aStr1 { get; set; }
            public Guid? aGud1 { get; set; }
            public string aStr2 { get; set; }
            public int aInt1 { get; set; }
            public long? aLong1 { get; set; }

            public DateTime aDate1 { get; set; }
            public DateTime? aDate2 { get; set; }

            //public int QueryCount { get; set; }
            public int QueryCount = 0;//

            public string zData { get; set; }
            public bool Equals(TestData x, TestData y)
            {
                return x.aStr1 == y.aStr1 &&
                    x.aStr2 == y.aStr2 &&
                       x.aGud1 == y.aGud1 &&
                       x.aStr2 == y.aStr2 &&
                       x.aInt1 == y.aInt1 &&
                       x.aLong1 == y.aLong1 &&
                       x.aDate1 == y.aDate1 &&
                       x.QueryCount == y.QueryCount ;
            }

            public int GetHashCode(TestData obj)
            {
                TestData ci = (TestData)obj;
                // http://stackoverflow.com/a/263416/328397
                return 
                  new { 
                         A = ci.aStr1, 
                         Aa = ci.aStr2, 
                         B = ci.aGud1, 
                         C = ci.aStr2, 
                         D = ci.aInt1, 
                         E = ci.aLong1, 
                         F = ci.QueryCount , 
                         G = ci.aDate1}.GetHashCode();
            }
        }
        private   void AddOrUpdateWithoutRetrieving()
        {
            // Sometime later. We receive new data from some source.
            TestData ci = new TestData() 
            { 
              aStr1 = "Austin", 
              aGud1 = new Guid(), 
              aStr2 = "System", 
              aLong1 = 100, 
              aInt1 = 1000, 
              QueryCount = 0, 
              aDate1 = DateTime.MinValue
            };

            TestData verify = concurrentCache.AddOrUpdate(123, ci,
                (key, existingVal) =>
                {
                    existingVal.aStr2 = "test1" + existingVal.QueryCount;
                    existingVal.aDate1 = DateTime.MinValue;
                    Console.WriteLine
                     ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
                          "  Query Count A:" + existingVal.QueryCount);
                    Interlocked.Increment(ref existingVal.QueryCount);
                    System.Random RandNum = new System.Random();
                    int MyRandomNumber = RandNum.Next(1, 1000);

                    Thread.Sleep(MyRandomNumber);
                    existingVal.aInt1++;
                    existingVal.aDate1 = 
                         existingVal.aDate1.AddSeconds
                         (existingVal.aInt1);  
                    Console.WriteLine(
                          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
                           "  Query Count B:" + existingVal.QueryCount);
                    return existingVal;
                });


            // After each run, every value here should be ++ the previous value
            Console.WriteLine(
                "Thread:"+Thread.CurrentThread.ManagedThreadId + 
                 ": Query Count returned:" + verify.QueryCount + 
                 " eid:" + verify.aInt1 + " date:" +  
                 verify.aDate1.Hour + " "  + verify.aDate1.Second + 
                 " NAME:" + verify.aStr2
                );
        }

    }
}

输出

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System

Thread:12  Query Count A:0
Thread:13  Query Count A:1
Thread:12  Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11

Thread:12  Query Count A:2
Thread:13  Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12

Thread:13  Query Count A:3
Thread:11  Query Count A:4
Thread:11  Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14

Thread:11  Query Count A:5
Thread:13  Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15

……

Thread:11  Query Count A:658
Thread:11  Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658

Thread:11  Query Count A:659
Thread:11  Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659

Thread:11  Query Count A:660
Thread:11  Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660

Thread:11  Query Count A:661
Thread:11  Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661

在此代码中,“eid”应始终比查询计数多 1,000,但在迭代中,两者之间的差异从 1 到 7 不等。这种不一致可能会导致某些应用程序失败或报告不正确的数据。

4

4 回答 4

4

此提交基于对文章“<em>如何:从 ConcurrentDictionary 添加和删除项目” http://msdn.microsoft.com/en-us/library/dd997369底部的注释的错误理解。 aspx和一个基本的并发错误 - 共享对象的并发非原子修改。

首先,让我们澄清一下链接文章的真正含义。我将使用 AddOrUpdate 作为示例,但 GetOrAdd 的推理是等效的。

假设您从多个线程调用 AddOrUpdate 并指定相同的键。假设已经存在具有该键的条目。每个线程都会出现,注意已经有一个具有指定键的条目,并且 AddOrUpdate 的更新部分是相关的。这样做时,没有线程会锁定字典。相反,它将使用一些互锁指令来自动检查条目键是否存在。

所以,我们的几个线程都注意到key存在,需要调用updateValueFactory。该委托被传递给 AddOrUpdate;它引用现有的键和值并返回更新值。现在,所有涉及的线程都将同时调用工厂。它们都将以一些以前未知的顺序完成,并且每个线程都将尝试使用原子操作(使用互锁指令)将现有值替换为其刚刚计算的值。没有办法知道哪个线程会“获胜”。获胜的线程将存储其计算值。其他人会注意到字典中的值不再是作为参数传递给他们的 updateValueFactory 的值。作为对这一认识的回应,他们将放弃操作并丢弃刚刚计算的值。

接下来,让我们澄清一下为什么在运行此处列出的代码示例时会得到奇怪的值:

回想一下,传递给 AddOrUpdate 的 updateValueFactory 委托使用 REFERENCES 现有的键和值并返回更新值。AddOrUpdateWithoutRetrieving() 方法中的代码示例开始直接对该引用执行操作。它不是创建新的替换值并修改 THAT,而是修改existingVal 的实例成员值——一个已经在字典中的对象——然后简单地返回该引用。它不是原子的——它读取一些值,更新一些值,读取更多,更新更多。当然,我们在上面已经看到,这同时发生在多个线程上——它们都修改了相同的对象。难怪结果是在任何时候(当代码示例调用 WriteLine 时),对象都包含源自不同线程的成员实例值。

字典与此无关——代码只是以非原子方式修改线程之间共享的对象。这是最常见的并发错误之一。两种最常见的解决方法取决于具体情况。要么使用共享锁使整个对象修改原子化,要么先原子地复制整个对象,然后修改本地副本。

对于后者,请尝试将其添加到 TestData 类:

private Object _copyLock = null;

private Object GetLock() {

    if (_copyLock != null)
        return _copyLock;

    Object newLock = new Object();
    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null);
    return (prevLock == null) ? newLock : prevLock;
}

public TestData Copy() {

    lock (GetLock()) {
        TestData copy = new TestData();
        copy.aStr1 = this.aStr1;
        copy.aStr2 = this.aStr2;
        copy.aLong1 = this.aLong1;
        copy.aInt1 = this.aInt1;
        copy.QueryCount = this.QueryCount;
        copy.aDate1 = this.aDate1;
        copy.aDate2 = this.aDate2;
        copy.zData = this.zData;

        return copy;
    }
}

然后修改工厂如下:

TestData verify = concurrentCache.AddOrUpdate(123, ci,
    (key, existingVal) =>
    {
        TestData newVal = existingVal.Copy();
        newVal.aStr2 = "test1" + newVal.QueryCount;
        newVal.aDate1 = DateTime.MinValue;
        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count A:" + newVal.QueryCount);
        Interlocked.Increment(ref newVal.QueryCount);
        System.Random RandNum = new System.Random();
        int MyRandomNumber = RandNum.Next(1, 1000);

        Thread.Sleep(MyRandomNumber);
        newVal.aInt1++;
        newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1);
        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count B:" + newVal.QueryCount);
        return newVal;
    });

我希望这有帮助。

于 2012-08-16T20:00:01.173 回答
3

可能正确的方法是不关心返回的值是否不是由valueFactory. 如果这不可接受,则需要使用锁。

于 2012-05-19T18:15:53.657 回答
2

没有总能奏效的一般保护措施。但一个常见的解决方法是返回 aLazy<T>而不是 a T。这样创建不需要的懒惰并没有害处,因为永远不会启动。只有一个 Lazy 会使其成为 key 对应的最终值。只会返回一个特定的 Lazy 实例。

于 2012-05-19T22:00:53.867 回答
1

你可以使用这个GetOrAdd人自己的实现。请注意,即使在这里也可以调用工厂,而无需将其结果添加到字典中。但是你会知道发生了什么。

于 2012-05-20T09:24:48.220 回答