0

我必须通过并行交换随机索引元素来对数组进行洗牌。我的问题是如何防止其他线程读取和写入当前正在被另一个线程交换的元素。我不想在一个线程交换时锁定整个数组。

我想让几个线程同时交换不同的元素对。

我试过这样的事情:

        object[] lockArray = new object[array.Length];
        for (int i = 0; i < array.Length; i++)
            lockArray[i] = new object();

        for (int i = 0; i < thredCount; i++)
        {
            Thread t = new Thread(th => Shuffle.Shuflle(array,lockArray));
            t.Start();
        }

        public class Shuffle
        {
            public static void Shuflle(char[] array,object [] lockArray)
            {
                    for (int count = array.Length - 1; count > 1; count--)
                    {
                        Random rand = new Random();
                        int y = rand.Next(count) + 1;

                        lock (lockArray[count])
                        {
                            lock (lockArray[y])
                            {
                                char temp = array[count];
                                array[count] = array[y];
                                array[y] = temp;
                            }
                        }


                    }
            }
        }

在数组中有数字作为从 0 到 9 的字符,结果是重新排序的数字。但有时我会得到一个加倍的结果。138952469. 9 现在在 shuffled 数组中加倍,并且 7 丢失了。

请帮我诊断问题。

4

3 回答 3

4

根本不使用锁怎么办:

private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
{
  Interlocked.Increment(ref nSwap);
  if(i == j) return;
  var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
  var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
  Interlocked.Exchange(ref arr[i], vj);
  Interlocked.Exchange(ref arr[j], vi);
}

private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
{
  spinWait.Reset();
  while(true) {
    var vi = Interlocked.Exchange(ref arr[i], sentinel);
    if(vi != sentinel) return vi;
    spinWait.SpinOnce();
  }
}

哨兵只是一些虚拟对象,在所有进行交换的线程之间共享,用于“保留”交换位置。

var sentinel = new object();

我的笔记本电脑(i7)上的运行结果:

Run 0 took 272ms (nSwap=799984, nConflict=300)
Run 1 took 212ms (nSwap=799984, nConflict=706)
Run 2 took 237ms (nSwap=799984, nConflict=211)
Run 3 took 206ms (nSwap=799984, nConflict=633)
Run 4 took 228ms (nSwap=799984, nConflict=350)

nConflict 是交换未能保留头寸的次数。与交换的总数相比,它相当低,因此我针对没有冲突的情况优化了例程,仅在发生冲突时调用 SpinUntil。

我测试的整个代码:

[TestClass]
  public class ParallelShuffle
  {
    private int nSwap = 0;
    private int nConflict = 0;
    [TestMethod]
    public void Test()
    {
      const int size = 100000;
      const int thCount = 8;
      var sentinel = new object();
      var array = new object[size];

      for(int i = 0; i < array.Length; i++)
        array[i] = i;

      for(var nRun = 0; nRun < 10; ++nRun) {
        nConflict = 0;
        nSwap = 0;
        var sw = Stopwatch.StartNew();
        var tasks = new Task[thCount];
        for(int i = 0; i < thCount; ++i) {
          tasks[i] = Task.Factory.StartNew(() => {
            var rand = new Random();
            var spinWait = new SpinWait();
            for(var count = array.Length - 1; count > 1; count--) {
              var y = rand.Next(count);
              OptimisticalSwap(array, count, y, sentinel, spinWait);
            }
          }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        Task.WaitAll(tasks);

        //Console.WriteLine(String.Join(", ", array));
        Console.WriteLine("Run {3} took {0}ms (nSwap={1}, nConflict={2})", sw.ElapsedMilliseconds, nSwap, nConflict, nRun);
        // check for doubles:
        var checkArray = new bool[size];
        for(var i = 0; i < array.Length; ++i) {
          var value = (int) array[i];
          Assert.IsFalse(checkArray[value], "A double! (at {0} = {1})", i, value);
          checkArray[value] = true;
        }
      }
    }


   private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
    {
      Interlocked.Increment(ref nSwap);
      if(i == j) return;
      var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
      var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
      Interlocked.Exchange(ref arr[i], vj);
      Interlocked.Exchange(ref arr[j], vi);
    }

    private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
    {
      spinWait.Reset();
      while(true) {
        var vi = Interlocked.Exchange(ref arr[i], sentinel);
        if(vi != sentinel) return vi;
        spinWait.SpinOnce();
      }
    }
于 2012-08-30T08:39:48.387 回答
3

只是出于好奇,为什么要允许多个线程并行交换?交换不应该花费很长时间,因此在交换期间锁定整个数组可能比锁定单个元素的任何尝试要快得多。

也就是说,如果你真的想做并行洗牌,你最好这样做:

  1. 用单独的线程对数组的每一半进行洗牌(它们不能冲突)。
  2. 对数组进行“完美洗牌”(即从两侧交错元素;有关更多信息,请参阅Faro Shuffle。)
  3. 再次洗牌阵列的每一半。

如果您想做四个线程值得洗牌,您可以将其概括为四个部分。也就是说,“交换”必须是一个非常缓慢的操作,才能从中获得任何性能优势。

于 2012-08-29T22:19:16.577 回答
0

一种简单的方法是使用 Fisher-Yates 算法创建映射列表,然后在使用映射执行转置 en-mass 时锁定源数组上的所有操作。但是,这会消耗额外的资源,会花费更长的时间,并且只会略微减少阵列的不稳定时间。您也可以像这样构建一个新结果。

public static IEnumerable<T> Shuffle<T>(
    this IEnumerable<T> source,
    Random random = null)
{
    random = random ?? new Random();
    var list = source.ToList();

    for (int i = list.Length; i > 1; i--)
    {
        // Pick random element to swap.
        int j = random.Next(i); // 0 <= j <= i-1;

        // Swap.
        T tmp = list[j];
        list[j] = list[i - 1];
        list[i - 1] = tmp;
    }

    return list;
}

var shuffler = new Task<char[]>(() => return array.Shuffle().ToArray());
array = shuffler.Result;

如果你想在多个线程上洗牌,你需要一个不同的算法和一个大的资源来让它值得。

于 2012-08-30T10:02:20.550 回答