我正在使用 C# TPL,但生产者/消费者代码有问题......出于某种原因,TPL 不会重用线程并不断创建新线程而不会停止
我做了一个简单的例子来演示这种行为:
class Program
{
static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
static CancellationTokenSource m_Cts = new CancellationTokenSource();
static void Producer()
{
try
{
while (!m_Cts.IsCancellationRequested)
{
Console.WriteLine("Enqueuing job");
m_Buffer.Add(0);
Thread.Sleep(1000);
}
}
finally
{
m_Buffer.CompleteAdding();
}
}
static void Consumer()
{
Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
}
static void Run(int i)
{
Console.WriteLine
("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
Thread.CurrentThread.ManagedThreadId,
Process.GetCurrentProcess().Threads.Count);
}
static void Main(string[] args)
{
Task producer = new Task(Producer);
Task consumer = new Task(Consumer);
producer.Start();
consumer.Start();
Console.ReadKey();
m_Cts.Cancel();
Task.WaitAll(producer, consumer);
}
}
此代码创建 2 个任务,生产者和消费者。Produces 每秒添加 1 个工作项,Consumer 只打印出一个带有信息的字符串。我会假设在这种情况下 1 个使用者线程就足够了,因为任务的处理速度比它们添加到队列中的速度要快得多,但实际发生的是进程中的每秒钟线程数增加 1... 好像TPL 正在为每个项目创建新线程
在试图了解发生了什么之后,我还注意到了另一件事:即使 BlockingCollection 的大小为 1,但一段时间后 Consumer 开始被突然调用,例如,它是这样开始的:
排队工作
作业处理线程:4 处理线程数:9
排队工作
作业处理线程:6 处理线程数:9
排队工作
作业处理线程:5 处理线程数:10
排队工作
作业处理线程:4 处理线程数:10
排队工作
作业处理线程:6 处理线程数:11
这就是它在不到一分钟后处理项目的方式:
排队工作
作业处理线程:25 处理线程数:52
排队工作
排队工作
作业处理线程:5 处理线程数:54
作业处理线程:5 处理线程数:54
并且因为线程在完成 Parallel.ForEach 循环后被释放(我没有在这个例子中展示它,但它是在真实的项目中)我认为它与 ForEach 具体有关......我找到了这篇文章http: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/,我认为我的问题是由这个默认分区器引起的,所以我从 TPL 示例中获取了自定义分区器那是一项一项地为消费者线程提供服务,尽管它固定了执行顺序(摆脱了延迟)...
排队工作
作业处理线程:71 处理线程数:140
排队工作
作业处理线程:12 处理线程数:141
排队工作
作业处理线程:72 处理线程数:142
排队工作
作业处理线程:38 处理线程数:143
排队工作
作业处理线程:73 处理线程数:143
排队工作
作业处理线程:21 处理线程数:144
排队工作
作业处理线程:74 处理线程数:145
...它并没有阻止线程增长
我知道 ParallelOptions.MaxDegreeOfParallelism,但我仍然想了解 TPL 发生了什么以及为什么它会无缘无故地创建数百个线程
在我的项目中,我的代码必须运行数小时并从数据库中读取新数据,将其放入 BlockingCollections 并由其他代码处理数据,大约每 5 秒有 1 个新项目,它需要几毫秒到几乎一个分钟来处理它,运行大约 10 分钟后,线程数达到了 1000 多个线程