3

在我的 Parallel.ForEach 循环中,localFinally 委托确实在所有线程上被调用。我发现当我的并行循环停止时会发生这种情况。在我的并行循环中,我有大约三个条件检查阶段,它们在循环完成之前返回。似乎是当线程从这些阶段返回而不是整个主体的执行时,它才不执行 localFinally 委托。

循环结构如下:

 var startingThread = Thread.CurrentThread;
 Parallel.ForEach(fullList, opt,
         ()=> new MultipleValues(),
         (item, loopState, index, loop) =>
         {
            if (cond 1)
                return loop;
            if (cond 2)
                {
                process(item);
                return loop;
                }
            if (cond 3)
                return loop;

            Do Work(item);
            return loop;
          },
          partial =>
           {
              Log State of startingThread and threads
            } );

我在一个小数据集上运行循环并详细记录,发现虽然 Parallel.ForEach 完成了所有迭代,并且 localFinally 的最后一个线程的日志是 -- Calling Thread State is WaitSleepJoin for Thread 6 Loop Indx
16循环仍然没有优雅地完成并且仍然停滞不前......任何线索为什么会停滞?

干杯!

4

2 回答 2

1

在看到 localFinally 的定义(在每个线程完成后执行)后进行了快速测试运行,这让我怀疑这可能意味着并行创建的线程比执行的循环少得多。例如

        var test = new List<List<string>> ();
        for (int i = 0; i < 1000; i++)
        {
            test.Add(null);
        }

        int finalcount = 0;
        int itemcount = 0;
        int loopcount = 0;

        Parallel.ForEach(test, () => new List<string>(),
            (item, loopState, index, loop) =>
            {
                Interlocked.Increment(ref loopcount);
                loop.Add("a");
                //Thread.Sleep(100);
                return loop;
            },
            l =>
            {
                Interlocked.Add(ref itemcount, l.Count);                    
                Interlocked.Increment(ref finalcount);                    
            });

在此循环结束时,itemcount 和 loopcount 为 1000,如预期的那样,并且(在我的机器上)finalcount 为 1 或 2,具体取决于执行速度。在有条件的情况下:直接返回时,执行速度可能要快得多,并且不需要额外的线程。只有在执行任务时才需要更多线程。但是参数(在我的例子中是 l)包含所有执行的组合列表。这可能是日志记录差异的原因吗?

于 2012-06-07T12:14:31.397 回答
1

我想你只是误解了什么localFinally意思。不是为每个项目调用它,而是为每个使用的线程调用它Parallel.ForEach()。许多项目可以共享同一个线程。

它存在的原因是您可以在每个线程上独立执行一些聚合,并且仅在最后将它们连接在一起。这样,您只需在非常小的一段代码中处理同步(并让它影响您的性能)。

例如,如果您想计算一组项目的得分总和,您可以这样做:

int totalSum = 0;
Parallel.ForEach(
    collection, item => Interlocked.Add(ref totalSum, ComputeScore(item)));

但是在这里,您调用Interlocked.Add()每个项目,这可能很慢。使用localInitand localFinally,您可以像这样重写代码:

int totalSum = 0;
Parallel.ForEach(
    collection,
    () => 0,
    (item, state, localSum) => localSum + ComputeScore(item),
    localSum => Interlocked.Add(ref totalSum, localSum));

请注意,代码Interlocked.Add()仅在 中使用,localFinally并且确实访问了 中的全局状态body。这样,同步成本只需支付几次,每个线程使用一次。

注意:我Interlocked在这个例子中使用了,因为它非常简单而且很明显是正确的。如果代码更复杂,我会lock先使用,并尝试Interlocked仅在需要良好性能时使用。

于 2012-06-07T14:40:34.233 回答