4

在斯蒂芬图布的书的第 88 页上

http://www.microsoft.com/download/en/details.aspx?id=19222

有代码

private BlockingCollection<T> _streamingData = new BlockingCollection<T>();
// Parallel.ForEach
Parallel.ForEach(_streamingData.GetConsumingEnumerable(),
item => Process(item));
// PLINQ
var q = from item in _streamingData.GetConsumingEnumerable().AsParallel()
...
select item;

斯蒂芬然后提到

“将调用 GetConsumingEnumerable 的结果作为数据源传递给 Parallel.ForEach 时,循环使用的线程有可能在集合变空时阻塞。阻塞的线程可能不会被 Parallel.ForEach 释放回 ThreadPool “

我不明白为什么线程数会增加?

如果集合为空,那么blockingcollection 不会请求任何进一步的线程吗?

因此,您不需要使用 WithDegreeOfParallelism 来限制 BlockingCollection 上使用的线程数

4

1 回答 1

3

线程池有一个爬山算法,用于估计适当的线程数。只要增加线程增加吞吐量,线程池就会创建更多线程。它将假设发生了一些阻塞或 IO,并尝试通过超过系统中处理器的数量来使 CPU 饱和。

这就是为什么在线程池线程上执行 IO 和阻塞操作可能很危险。

这是所述行为的完整工作示例:

        BlockingCollection<string> _streamingData = new BlockingCollection<string>();

        Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    _streamingData.Add(i.ToString());
                    Thread.Sleep(100);
                }
            });

        new Thread(() =>
            {
                while (true)
                {
                    Thread.Sleep(1000);
                    Console.WriteLine("Thread count: " + Process.GetCurrentProcess().Threads.Count);
                }
            }).Start();

        Parallel.ForEach(_streamingData.GetConsumingEnumerable(), item =>
            {
            });

我不知道为什么线程数一直在攀升,尽管它并没有增加吞吐量。根据我解释的模型,它不会增长。但我不知道我的模型是否真的正确。

也许线程池有一个额外的启发式方法,如果它根本看不到任何进展(以每秒完成的任务来衡量),它就会产生线程。这是有道理的,因为这可能会防止应用程序中出现很多死锁。如果重要任务由于等待现有任务退出并使线程可用而无法运行,则可能会发生死锁。这是线程池的一个众所周知的问题。

于 2012-01-28T16:19:35.520 回答