8

我正在使用 C# TPL,但生产者/消费者代码有问题......出于某种原因,TPL 不会重用线程并不断创建新线程而不会停止

我做了一个简单的例子来演示这种行为:

class Program
{
    static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
    static CancellationTokenSource m_Cts = new CancellationTokenSource();

    static void Producer()
    {
        try
        {
            while (!m_Cts.IsCancellationRequested)
            {
                Console.WriteLine("Enqueuing job");
                m_Buffer.Add(0);
                Thread.Sleep(1000);
            }
        }
        finally
        {
            m_Buffer.CompleteAdding();
        }
    }

    static void Consumer()
    {
        Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
    }

    static void Run(int i)
    {
        Console.WriteLine
            ("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
              Thread.CurrentThread.ManagedThreadId, 
              Process.GetCurrentProcess().Threads.Count);
    }

    static void Main(string[] args)
    {
        Task producer = new Task(Producer);
        Task consumer = new Task(Consumer);
        producer.Start();
        consumer.Start();

        Console.ReadKey();
        m_Cts.Cancel();

        Task.WaitAll(producer, consumer);
    }
}

此代码创建 2 个任务,生产者和消费者。Produces 每秒添加 1 个工作项,Consumer 只打印出一个带有信息的字符串。我会假设在这种情况下 1 个使用者线程就足够了,因为任务的处理速度比它们添加到队列中的速度要快得多,但实际发生的是进程中的每秒钟线程数增加 1... 好像TPL 正在为每个项目创建新线程

在试图了解发生了什么之后,我还注意到了另一件事:即使 BlockingCollection 的大小为 1,但一段时间后 Consumer 开始被突然调用,例如,它是这样开始的:

排队工作

作业处理线程:4 处理线程数:9

排队工作

作业处理线程:6 处理线程数:9

排队工作

作业处理线程:5 处理线程数:10

排队工作

作业处理线程:4 处理线程数:10

排队工作

作业处理线程:6 处理线程数:11

这就是它在不到一分钟后处理项目的方式:

排队工作

作业处理线程:25 处理线程数:52

排队工作

排队工作

作业处理线程:5 处理线程数:54

作业处理线程:5 处理线程数:54

并且因为线程在完成 Parallel.ForEach 循环后被释放(我没有在这个例子中展示它,但它是在真实的项目中)我认为它与 ForEach 具体有关......我找到了这篇文章http: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/,我认为我的问题是由这个默认分区器引起的,所以我从 TPL 示例中获取了自定义分区器那是一项一项地为消费者线程提供服务,尽管它固定了执行顺序(摆脱了延迟)...

排队工作

作业处理线程:71 处理线程数:140

排队工作

作业处理线程:12 处理线程数:141

排队工作

作业处理线程:72 处理线程数:142

排队工作

作业处理线程:38 处理线程数:143

排队工作

作业处理线程:73 处理线程数:143

排队工作

作业处理线程:21 处理线程数:144

排队工作

作业处理线程:74 处理线程数:145

...它并没有阻止线程增长

我知道 ParallelOptions.MaxDegreeOfParallelism,但我仍然想了解 TPL 发生了什么以及为什么它会无缘无故地创建数百个线程

在我的项目中,我的代码必须运行数小时并从数据库中读取新数据,将其放入 BlockingCollections 并由其他代码处理数据,大约每 5 秒有 1 个新项目,它需要几毫秒到几乎一个分钟来处理它,运行大约 10 分钟后,线程数达到了 1000 多个线程

4

1 回答 1

6

有两件事共同导致了这种行为:

  1. ThreadPool尝试根据您的情况使用最佳线程数。但是,如果池中的一个线程阻塞,池会认为该线程没有做任何有用的工作,因此它倾向于在此之后很快创建另一个线程。这意味着如果您有很多阻塞,ThreadPool那么在猜测最佳线程数方面真的很糟糕,并且它倾向于创建新线程直到达到限制。

  2. Parallel.ForEach()信任ThreadPool猜测正确的线程数,除非您明确设置最大线程数。Parallel.ForEach()也主要用于有界集合,而不是数据流。

当你将这两件事与 结合起来时GetConsumingEnumerable(),你得到的是Parallel.ForEach()创建几乎总是被阻塞的线程。看到这ThreadPool一点,为了尽量保持 CPU 的利用率,创建了越来越多的线程。

这里正确的解决方案是设置MaxDegreeOfParallelism. 如果您的计算受 CPU 限制,则最佳值很可能是Environment.ProcessorCount. 如果它们受 IO 限制,则必须通过实验找出最佳值。

如果您可以使用 .Net 4.5,另一种选择是使用 TPL 数据流。这个库是专门用来处理数据流的,就像你有的那样,所以它没有你的代码有的问题。它实际上甚至比这更好,并且在当前不处理任何内容时根本不使用任何线程。

注意:为每个新项目创建一个新线程还有一个很好的理由,但解释这需要我Parallel.ForEach()更详细地解释如何工作,我觉得这里没有必要。

于 2012-08-30T08:22:28.193 回答