根本不使用锁怎么办:
private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
{
Interlocked.Increment(ref nSwap);
if(i == j) return;
var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
Interlocked.Exchange(ref arr[i], vj);
Interlocked.Exchange(ref arr[j], vi);
}
private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
{
spinWait.Reset();
while(true) {
var vi = Interlocked.Exchange(ref arr[i], sentinel);
if(vi != sentinel) return vi;
spinWait.SpinOnce();
}
}
哨兵只是一些虚拟对象,在所有进行交换的线程之间共享,用于“保留”交换位置。
var sentinel = new object();
我的笔记本电脑(i7)上的运行结果:
Run 0 took 272ms (nSwap=799984, nConflict=300)
Run 1 took 212ms (nSwap=799984, nConflict=706)
Run 2 took 237ms (nSwap=799984, nConflict=211)
Run 3 took 206ms (nSwap=799984, nConflict=633)
Run 4 took 228ms (nSwap=799984, nConflict=350)
nConflict 是交换未能保留头寸的次数。与交换的总数相比,它相当低,因此我针对没有冲突的情况优化了例程,仅在发生冲突时调用 SpinUntil。
我测试的整个代码:
[TestClass]
public class ParallelShuffle
{
private int nSwap = 0;
private int nConflict = 0;
[TestMethod]
public void Test()
{
const int size = 100000;
const int thCount = 8;
var sentinel = new object();
var array = new object[size];
for(int i = 0; i < array.Length; i++)
array[i] = i;
for(var nRun = 0; nRun < 10; ++nRun) {
nConflict = 0;
nSwap = 0;
var sw = Stopwatch.StartNew();
var tasks = new Task[thCount];
for(int i = 0; i < thCount; ++i) {
tasks[i] = Task.Factory.StartNew(() => {
var rand = new Random();
var spinWait = new SpinWait();
for(var count = array.Length - 1; count > 1; count--) {
var y = rand.Next(count);
OptimisticalSwap(array, count, y, sentinel, spinWait);
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
Task.WaitAll(tasks);
//Console.WriteLine(String.Join(", ", array));
Console.WriteLine("Run {3} took {0}ms (nSwap={1}, nConflict={2})", sw.ElapsedMilliseconds, nSwap, nConflict, nRun);
// check for doubles:
var checkArray = new bool[size];
for(var i = 0; i < array.Length; ++i) {
var value = (int) array[i];
Assert.IsFalse(checkArray[value], "A double! (at {0} = {1})", i, value);
checkArray[value] = true;
}
}
}
private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
{
Interlocked.Increment(ref nSwap);
if(i == j) return;
var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
Interlocked.Exchange(ref arr[i], vj);
Interlocked.Exchange(ref arr[j], vi);
}
private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
{
spinWait.Reset();
while(true) {
var vi = Interlocked.Exchange(ref arr[i], sentinel);
if(vi != sentinel) return vi;
spinWait.SpinOnce();
}
}