51

我有一个关于并行 for 循环的问题。我有以下代码:

    public static void MultiplicateArray(double[] array, double factor)
    {
        for (int i = 0; i < array.Length; i++)
        {
            array[i] = array[i] * factor;
        }
    }

    public static void MultiplicateArray(double[] arrayToChange, double[] multiplication)
    {
        for (int i = 0; i < arrayToChange.Length; i++)
        {
            arrayToChange[i] = arrayToChange[i] * multiplication[i];
        }
    }

    public static void MultiplicateArray(double[] arrayToChange, double[,] multiArray, int dimension)
    {
        for (int i = 0; i < arrayToChange.Length; i++)
        {
            arrayToChange[i] = arrayToChange[i] * multiArray[i, dimension];
        }
    }

现在我尝试添加并行功能:

    public static void MultiplicateArray(double[] array, double factor)
    {
        Parallel.For(0, array.Length, i =>
            {
                array[i] = array[i] * factor;
            });
    }

    public static void MultiplicateArray(double[] arrayToChange, double[] multiplication)
    {
        Parallel.For(0, arrayToChange.Length, i =>
        {
            arrayToChange[i] = arrayToChange[i] * multiplication[i];
        });
    }

    public static void MultiplicateArray(double[] arrayToChange, double[,] multiArray, int dimension)
    {
        Parallel.For(0, arrayToChange.Length, i =>
        {
            arrayToChange[i] = arrayToChange[i] * multiArray[i, dimension];
        });
    }

问题是,我想节省时间,而不是浪费时间。使用标准 for 循环计算大约 2 分钟,但使用并行 for 循环需要 3 分钟。为什么?

4

5 回答 5

69

Parallel.For()通过并行化代码可以大大提高性能,但它也有开销(线程之间的同步,在每次迭代时调用委托)。而且由于在您的代码中,每次迭代都非常短(基本上只有几条 CPU 指令),因此这种开销可能会变得很突出。

因此,我认为 usingParallel.For()不是适合您的解决方案。相反,如果您手动并行化代码(在这种情况下非常简单),您可能会看到性能提高。

为了验证这一点,我进行了一些测量:我在MultiplicateArray()200 000 000 个项目的数组上运行了不同的实现(我使用的代码如下)。在我的机器上,串行版本始终需要 0.21 秒,Parallel.For()通常需要 0.45 秒左右,但有时会飙升到 8-9 秒!

首先,我将尝试改进常见情况,稍后我会谈到这些尖峰。我们想用N个 CPU 处理这个数组,所以我们把它分成N个大小相等的部分,分别处理每个部分。结果?0.35 秒。这仍然比串行版本差。但是for遍历数组中的每个项目是最优化的结构之一。我们不能做点什么来帮助编译器吗?提取计算循环的边界可能会有所帮助。事实证明确实如此:0.18 秒。这比串行版本好,但不是很多。而且,有趣的是,在我的 4 核机器(无超线程)上将并行度从 4 更改为 2 并没有改变结果:仍然是 0.18 秒。这让我得出结论,CPU 不是这里的瓶颈,内存带宽才是。

现在,回到峰值:我的自定义并行化没有它们,但是Parallel.For()有,为什么?Parallel.For()确实使用范围分区,这意味着每个线程都处理自己的数组部分。但是,如果一个线程提前完成,它将尝试帮助处理尚未完成的另一个线程的范围。如果发生这种情况,您将获得大量虚假共享,这可能会大大降低代码速度。我自己对强制错误共享的测试似乎表明这确实是问题所在。强制的并行度Parallel.For()似乎对尖峰有所帮助。

当然,所有这些测量都是特定于我计算机上的硬件的,对你来说会有所不同,所以你应该自己测量。

我使用的代码:

static void Main()
{
    double[] array = new double[200 * 1000 * 1000];

    for (int i = 0; i < array.Length; i++)
        array[i] = 1;

    for (int i = 0; i < 5; i++)
    {
        Stopwatch sw = Stopwatch.StartNew();
        Serial(array, 2);
        Console.WriteLine("Serial: {0:f2} s", sw.Elapsed.TotalSeconds);

        sw = Stopwatch.StartNew();
        ParallelFor(array, 2);
        Console.WriteLine("Parallel.For: {0:f2} s", sw.Elapsed.TotalSeconds);

        sw = Stopwatch.StartNew();
        ParallelForDegreeOfParallelism(array, 2);
        Console.WriteLine("Parallel.For (degree of parallelism): {0:f2} s", sw.Elapsed.TotalSeconds);

        sw = Stopwatch.StartNew();
        CustomParallel(array, 2);
        Console.WriteLine("Custom parallel: {0:f2} s", sw.Elapsed.TotalSeconds);

        sw = Stopwatch.StartNew();
        CustomParallelExtractedMax(array, 2);
        Console.WriteLine("Custom parallel (extracted max): {0:f2} s", sw.Elapsed.TotalSeconds);

        sw = Stopwatch.StartNew();
        CustomParallelExtractedMaxHalfParallelism(array, 2);
        Console.WriteLine("Custom parallel (extracted max, half parallelism): {0:f2} s", sw.Elapsed.TotalSeconds);

        sw = Stopwatch.StartNew();
        CustomParallelFalseSharing(array, 2);
        Console.WriteLine("Custom parallel (false sharing): {0:f2} s", sw.Elapsed.TotalSeconds);
    }
}

static void Serial(double[] array, double factor)
{
    for (int i = 0; i < array.Length; i++)
    {
        array[i] = array[i] * factor;
    }
}

static void ParallelFor(double[] array, double factor)
{
    Parallel.For(
        0, array.Length, i => { array[i] = array[i] * factor; });
}

static void ParallelForDegreeOfParallelism(double[] array, double factor)
{
    Parallel.For(
        0, array.Length, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
        i => { array[i] = array[i] * factor; });
}

static void CustomParallel(double[] array, double factor)
{
    var degreeOfParallelism = Environment.ProcessorCount;

    var tasks = new Task[degreeOfParallelism];

    for (int taskNumber = 0; taskNumber < degreeOfParallelism; taskNumber++)
    {
        // capturing taskNumber in lambda wouldn't work correctly
        int taskNumberCopy = taskNumber;

        tasks[taskNumber] = Task.Factory.StartNew(
            () =>
            {
                for (int i = array.Length * taskNumberCopy / degreeOfParallelism;
                    i < array.Length * (taskNumberCopy + 1) / degreeOfParallelism;
                    i++)
                {
                    array[i] = array[i] * factor;
                }
            });
    }

    Task.WaitAll(tasks);
}

static void CustomParallelExtractedMax(double[] array, double factor)
{
    var degreeOfParallelism = Environment.ProcessorCount;

    var tasks = new Task[degreeOfParallelism];

    for (int taskNumber = 0; taskNumber < degreeOfParallelism; taskNumber++)
    {
        // capturing taskNumber in lambda wouldn't work correctly
        int taskNumberCopy = taskNumber;

        tasks[taskNumber] = Task.Factory.StartNew(
            () =>
            {
                var max = array.Length * (taskNumberCopy + 1) / degreeOfParallelism;
                for (int i = array.Length * taskNumberCopy / degreeOfParallelism;
                    i < max;
                    i++)
                {
                    array[i] = array[i] * factor;
                }
            });
    }

    Task.WaitAll(tasks);
}

static void CustomParallelExtractedMaxHalfParallelism(double[] array, double factor)
{
    var degreeOfParallelism = Environment.ProcessorCount / 2;

    var tasks = new Task[degreeOfParallelism];

    for (int taskNumber = 0; taskNumber < degreeOfParallelism; taskNumber++)
    {
        // capturing taskNumber in lambda wouldn't work correctly
        int taskNumberCopy = taskNumber;

        tasks[taskNumber] = Task.Factory.StartNew(
            () =>
            {
                var max = array.Length * (taskNumberCopy + 1) / degreeOfParallelism;
                for (int i = array.Length * taskNumberCopy / degreeOfParallelism;
                    i < max;
                    i++)
                {
                    array[i] = array[i] * factor;
                }
            });
    }

    Task.WaitAll(tasks);
}

static void CustomParallelFalseSharing(double[] array, double factor)
{
    var degreeOfParallelism = Environment.ProcessorCount;

    var tasks = new Task[degreeOfParallelism];

    int i = -1;

    for (int taskNumber = 0; taskNumber < degreeOfParallelism; taskNumber++)
    {
        tasks[taskNumber] = Task.Factory.StartNew(
            () =>
            {
                int j = Interlocked.Increment(ref i);
                while (j < array.Length)
                {
                    array[j] = array[j] * factor;
                    j = Interlocked.Increment(ref i);
                }
            });
    }

    Task.WaitAll(tasks);
}

示例输出:

Serial: 0,20 s
Parallel.For: 0,50 s
Parallel.For (degree of parallelism): 8,90 s
Custom parallel: 0,33 s
Custom parallel (extracted max): 0,18 s
Custom parallel (extracted max, half parallelism): 0,18 s
Custom parallel (false sharing): 7,53 s
Serial: 0,21 s
Parallel.For: 0,52 s
Parallel.For (degree of parallelism): 0,36 s
Custom parallel: 0,31 s
Custom parallel (extracted max): 0,18 s
Custom parallel (extracted max, half parallelism): 0,19 s
Custom parallel (false sharing): 7,59 s
Serial: 0,21 s
Parallel.For: 11,21 s
Parallel.For (degree of parallelism): 0,36 s
Custom parallel: 0,32 s
Custom parallel (extracted max): 0,18 s
Custom parallel (extracted max, half parallelism): 0,18 s
Custom parallel (false sharing): 7,76 s
Serial: 0,21 s
Parallel.For: 0,46 s
Parallel.For (degree of parallelism): 0,35 s
Custom parallel: 0,31 s
Custom parallel (extracted max): 0,18 s
Custom parallel (extracted max, half parallelism): 0,18 s
Custom parallel (false sharing): 7,58 s
Serial: 0,21 s
Parallel.For: 0,45 s
Parallel.For (degree of parallelism): 0,40 s
Custom parallel: 0,38 s
Custom parallel (extracted max): 0,18 s
Custom parallel (extracted max, half parallelism): 0,18 s
Custom parallel (false sharing): 7,58 s
于 2012-09-13T13:42:05.707 回答
19

Svick 已经提供了一个很好的答案,但我想强调的是,关键不是“手动并行化代码”而不是使用Parallel.For(),而是您必须处理更大的数据块

这仍然可以Parallel.For()像这样使用:

static void My(double[] array, double factor)
{
    int degreeOfParallelism = Environment.ProcessorCount;

    Parallel.For(0, degreeOfParallelism, workerId =>
    {
        var max = array.Length * (workerId + 1) / degreeOfParallelism;
        for (int i = array.Length * workerId / degreeOfParallelism; i < max; i++)
            array[i] = array[i] * factor;
    });
}

它与 svicks 做同样的事情,CustomParallelExtractedMax()但更短、更简单并且(在我的机器上)执行速度甚至更快:

Serial: 3,94 s
Parallel.For: 9,28 s
Parallel.For (degree of parallelism): 9,58 s
Custom parallel: 2,05 s
Custom parallel (extracted max): 1,19 s
Custom parallel (extracted max, half parallelism): 1,49 s
Custom parallel (false sharing): 27,88 s
My: 0,95 s

顺便说一句,所有其他答案中缺少的关键字是粒度

于 2014-04-28T09:22:17.927 回答
9

请参阅PLINQ 和 TPL 的自定义分区器

For循环中,循环体作为委托提供给方法。调用该委托的成本与虚拟方法调用大致相同。在某些情况下,并行循环的主体可能足够小,以至于每次循环迭代的委托调用成本变得很大。在这种情况下,您可以使用其中一个Create重载在源元素上创建一个IEnumerable<T>范围分区。然后,您可以将此范围集合传递给ForEach其主体由常规 for 循环组成的方法。这种方法的好处是委托调用成本仅在每个范围内产生一次,而不是每个元素一次。

在您的循环体中,您正在执行单次乘法,并且委托调用的开销将非常明显。

尝试这个:

public static void MultiplicateArray(double[] array, double factor)
{
    var rangePartitioner = Partitioner.Create(0, array.Length);

    Parallel.ForEach(rangePartitioner, range =>
    {
        for (int i = range.Item1; i < range.Item2; i++)
        {
            array[i] = array[i] * factor;
        }
    });
}

另请参阅:Parallel.ForEach文档Partitioner.Create文档

于 2014-02-06T21:55:15.370 回答
6

Parallel.For 涉及更复杂的内存管理。该结果可能会因 CPU 规格而异,例如 #cores、L1 和 L2 缓存...

请看这篇有趣的文章:

http://msdn.microsoft.com/en-us/magazine/cc872851.aspx

于 2012-09-13T13:25:41.887 回答
-6

来自http://msdn.microsoft.com/en-us/library/system.threading.tasks.parallel.aspxhttp://msdn.microsoft.com/en-us/library/dd537608.aspx

您没有创建三个执行 for 的线程/进程,而是尝试并行执行 for 的迭代,因此即使只有一个 for 您正在使用多个线程/进程。

这意味着 index = 0 和 index = 1 的交互可以同时执行。

可能您被迫使用过多的线程/进程,并且创建/执行它们的开销大于速度增益。

尝试在三个不同的线程/进程中使用三个普通的,如果您的系统是多核(至少 3 倍),它应该花费不到一分钟

于 2012-09-13T13:33:16.377 回答