7

所以主题就是问题。

我得到该方法 AsParallel 返回ParallelQuery<TSource>使用相同 LINQ 关键字的包装器,但来自System.Linq.ParallelEnumerable而不是System.Linq.Enumerable

很清楚,但是当我查看反编译的源时,我不明白它是如何工作的。

让我们从一个最简单的扩展开始:Sum() 方法。代码:

[__DynamicallyInvokable]
public static int Sum(this ParallelQuery<int> source)
{
  if (source == null)
    throw new ArgumentNullException("source");
  else
    return new IntSumAggregationOperator((IEnumerable<int>) source).Aggregate();
}

很清楚,让我们去Aggregate()方法。它是 InternalAggregate 方法的包装器,可以捕获一些异常。现在让我们来看看它。

protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?(ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

问题是:它是如何工作的?我看不到变量的并发安全性,由许多线程修改,我们只看到迭代器和求和。它是魔术枚举器吗?或者它是如何工作的?GetEnumerator()返回QueryOpeningEnumerator<TOutput>,但它的代码太复杂了。

4

3 回答 3

2

最后在我的第二次PLINQ 攻击中,我找到了答案。这很清楚。问题是枚举器并不简单。这是一个特别的multithreading。那么它是如何工作的呢?答案是enumerator不返回源的下一个值,它返回下一个分区的总和。所以这段代码只执行了 2,4,6,8... 次(基于Environment.ProcessorCount),当实际的求和工作方法内部enumerator.MoveNextenumerator.OpenQuery执行时。

因此,TPL 显然对源可枚举进行了分区,然后对每个分区进行独立求和,然后执行此求和,请参阅IntSumAggregationOperatorEnumerator<TKey>. 这里没有魔法,只是可以更深入。

于 2016-02-29T13:15:23.083 回答
1

Sum运算符在单个线程中聚合所有值。这里没有多线程。诀窍是多线程正在其他地方发生。

PLINQSum方法可以处理 PLINQ 枚举。可以使用允许在多个线程上处理集合的其他构造(例如 where)来构建这些可枚举。

Sum 运算符始终是链中的最后一个运算符。虽然可以在多个线程上处理这个和,但 TPL 团队可能发现这对性能有负面影响,这是合理的,因为这个方法唯一要做的就是简单的整数加法。

因此,此方法处理来自其他线程的所有可用结果,并在单个线程上处理它们并返回该值。真正的诀窍在于其他 PLINQ 扩展方法。

于 2013-08-22T13:18:40.290 回答
-2
protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?    (ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

此代码不会并行执行,while 将在其内部范围内按顺序执行。

试试这个

        List<int> list = new List<int>();

        int num = 0;

        Parallel.ForEach(list, (item) =>
            {
                checked { num += item; }
            });

内部动作将在 ThreadPool 上展开,当处理完所有项时,ForEach 语句将完成。

在这里你需要线程安全:

        List<int> list = new List<int>();

        int num = 0;

        Parallel.ForEach(list, (item) =>
            {
                Interlocked.Add(ref num, item);
            });
于 2013-08-22T13:18:55.447 回答