15

当我在我的程序中使用 Parallel.ForEach 时,我发现有些线程似乎永远不会完成。事实上,它一遍又一遍地产生新线程,这是我没有预料到也绝对不想要的行为。

我能够使用以下代码重现此行为,就像我的“真实”程序一样,都大量使用处理器和内存(.NET 4.0 代码):

public class Node
{
    public Node Previous { get; private set; }

    public Node(Node previous)
    {
        Previous = previous;
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        DateTime startMoment = DateTime.Now;
        int concurrentThreads = 0;

        var jobs = Enumerable.Range(0, 2000);
        Parallel.ForEach(jobs, delegate(int jobNr)
        {
            Interlocked.Increment(ref concurrentThreads);

            int heavyness = jobNr % 9;

            //Give the processor and the garbage collector something to do...
            List<Node> nodes = new List<Node>();
            Node current = null;
            for (int y = 0; y < 1024 * 1024 * heavyness; y++)
            {
                current = new Node(current);
                nodes.Add(current);
            }

            TimeSpan elapsed = DateTime.Now - startMoment;
            int threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
            Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining);
        });
    }
}

当在我的四核上运行时,它最初以 4 个并发线程开始,正如您所期望的那样。然而,随着时间的推移,越来越多的线程被创建。最终,该程序会抛出 OutOfMemoryException:

[00:00] Job    0 complete. 3 threads remaining.
[00:01] Job    1 complete. 4 threads remaining.
[00:01] Job    2 complete. 4 threads remaining.
[00:02] Job    3 complete. 4 threads remaining.
[00:05] Job    9 complete. 5 threads remaining.
[00:05] Job    4 complete. 5 threads remaining.
[00:05] Job    5 complete. 5 threads remaining.
[00:05] Job   10 complete. 5 threads remaining.
[00:08] Job   11 complete. 5 threads remaining.
[00:08] Job    6 complete. 5 threads remaining.
...
[00:55] Job   67 complete. 7 threads remaining.
[00:56] Job   81 complete. 8 threads remaining.
...
[01:54] Job  107 complete. 11 threads remaining.
[02:00] Job  121 complete. 12 threads remaining.
..
[02:55] Job  115 complete. 19 threads remaining.
[03:02] Job  166 complete. 21 threads remaining.
...
[03:41] Job  113 complete. 28 threads remaining.
<OutOfMemoryException>

上面实验的内存使用图如下:

处理器和内存使用情况

屏幕截图是荷兰语;上半部分表示处理器使用情况,下半部分表示内存使用情况。)如您所见,几乎每次垃圾收集器妨碍时都会产生一个新线程(可以看出在内存使用量下降的情况下)。

谁能解释为什么会发生这种情况,以及我能做些什么?我只想让 .NET 停止产生新线程,并首先完成现有线程......

4

3 回答 3

17

您可以通过指定具有属性集的ParallelOptions实例来限制创建的最大线程数:MaxDegreeOfParallelism

var jobs = Enumerable.Range(0, 2000);
ParallelOptions po = new ParallelOptions
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(jobs, po, jobNr =>
{
    // ...
});

至于为什么你会得到你正在观察的行为:默认情况下,TPL (它是 PLINQ 的基础)可以自由猜测要使用的最佳线程数。每当并行任务阻塞时,任务调度程序可能会创建一个新线程以保持进度。在您的情况下,阻塞可能是隐式发生的;例如,通过Console.WriteLine调用,或(如您所见)在垃圾收集期间。

使用任务并行库进行并发级别调整(使用多少线程?)

由于 TPL 默认策略是每个处理器使用一个线程,因此我们可以得出结论,TPL 最初假设任务的工作负载约为 100% 工作和 0% 等待,如果初始假设失败并且任务进入等待状态 (即开始阻塞)- TPL 可以随意添加适当的线程。

于 2012-12-26T10:16:56.930 回答
7

您可能应该阅读一些有关任务调度程序如何工作的信息。

http://msdn.microsoft.com/en-us/library/ff963549.aspx(页面后半部分)

“.NET 线程池自动管理池中工作线程的数量。它根据内置的启发式方法添加和删除线程。.NET 线程池有两种主要的线程注入机制:增加工作线程的饥饿避免机制如果它发现在排队的项目上没有任何进展,则线程和爬山启发式尝试在使用尽可能少的线程的同时最大化吞吐量。

饥饿避免的目标是防止死锁。当工作线程等待同步事件时,可能会发生这种死锁,而该同步事件只能由仍在线程池的全局或本地队列中挂起的工作项来满足。如果有固定数量的工作线程,并且所有这些线程都被类似地阻塞,那么系统将无法取得进一步的进展。添加一个新的工作线程解决了这个问题。

爬山启发式的一个目标是在线程被 I/O 或其他使处理器停止的等待条件阻塞时提高内核的利用率。默认情况下,托管线程池每个核心有一个工作线程。如果这些工作线程中的一个被阻塞,则内核可能未被充分利用,具体取决于计算机的整体工作负载。线程注入逻辑不区分被阻塞的线程和正在执行冗长的处理器密集型操作的线程。因此,只要线程池的全局或本地队列包含待处理的工作项,运行时间较长(超过半秒)的活动工作项就会触发创建新的线程池工作线程。”

您可以将任务标记为LongRunning,但这具有从线程池外部为其分配线程的副作用,这意味着该任务不能被内联。

请记住,ParallelFor将给定的工作视为块,因此即使一个循环中的工作相当小,由查找调用的任务完成的总体工作对调度程序来说可能看起来更长。

大多数对 GC 的调用都不会阻塞(它在单独的线程上运行),但是如果您等待 GC 完成,那么它会阻塞。还要记住 GC 正在重新排列内存,因此如果您在运行 GC 时尝试分配内存,这可能会产生一些副作用(和阻塞)。我在这里没有具体细节,但我知道 PPL 有一些内存分配功能,专门用于并发内存管理。

查看代码的输出,似乎事情运行了很多秒。所以我对你看到线程注入并不感到惊讶。但是我似乎记得默认线程池大小大约是 30 个线程(可能取决于系统上的内核数量)。在你的代码分配之前,一个线程大约占用了 1 MB 的内存,所以我不清楚为什么你会在这里得到一个内存不足的异常。

于 2013-01-16T20:41:29.820 回答
1

我已经发布了后续问题“如何计算 .NET 应用程序中的并发线程数量?”

如果直接计算线程数,它们在 Parallel.For() 中的数量大部分((非常罕见且微不足道地减少)只会增加并且不会在循环完成后释放。

在 Release 和 Debug 模式下都检查了这个,用

ParallelOptions po = new ParallelOptions
{
  MaxDegreeOfParallelism = Environment.ProcessorCount
};

并且没有

数字不同,但结论是一样的。

如果有人想玩,这是我正在使用的现成代码:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Edit4Posting
{
public class Node
{

  public Node Previous { get; private set; }
  public Node(Node previous)
  {
    Previous = previous;
    }
  }
  public class Edit4Posting
  {

    public static void Main(string[] args)
    {
      int concurrentThreads = 0;
      int directThreadsCount = 0;
      int diagThreadCount = 0;

      var jobs = Enumerable.Range(0, 160);
      ParallelOptions po = new ParallelOptions
      {
        MaxDegreeOfParallelism = Environment.ProcessorCount
      };
      Parallel.ForEach(jobs, po, delegate(int jobNr)
      //Parallel.ForEach(jobs, delegate(int jobNr)
      {
        int threadsRemaining = Interlocked.Increment(ref concurrentThreads);

        int heavyness = jobNr % 9;

        //Give the processor and the garbage collector something to do...
        List<Node> nodes = new List<Node>();
        Node current = null;
        //for (int y = 0; y < 1024 * 1024 * heavyness; y++)
        for (int y = 0; y < 1024 * 24 * heavyness; y++)
        {
          current = new Node(current);
          nodes.Add(current);
        }
        //*******************************
        directThreadsCount = Process.GetCurrentProcess().Threads.Count;
        //*******************************
        threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
        Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}",
          jobNr, threadsRemaining, directThreadsCount);
      });
      Console.WriteLine("FINISHED");
      Console.ReadLine();
    }
  }
}
于 2013-03-15T06:45:54.867 回答