4

我正在BlockingCollection尝试更好地理解它们,但我很难理解为什么我的代码在我使用Parallel.For

我只是给它加了一个数字(制片人?):

var blockingCollection = new BlockingCollection<long>();

Task.Factory.StartNew(() =>
{
    while (count <= 10000)
    {
        blockingCollection.Add(count);
        count++;
    }
});

然后我正在尝试处理(消费者?):

Parallel.For(0, 5, x => 
{
    foreach (long value in blockingCollection.GetConsumingEnumerable())
    {
        total[x] += 1;
        Console.WriteLine("Worker {0}: {1}", x, value);
    }
});

但是当它处理完所有的数字时,它就挂在那里了?我究竟做错了什么?

另外,当我将 Parallel.For 设置为 5 时,这是否意味着它正在 5 个单独的线程上处理数据?

4

3 回答 3

5

顾名思义,BlockingCollection<T>当他们不能做任何事情时,在块上的操作,这包括GetConsumingEnumerable().

这样做的原因是该集合无法判断您的生产者是否已经完成,或者只是忙于生产下一个项目。

您需要做的是通过调用来通知集合您已完成向其中添加项目CompleteAdding()。例如:

while (count <= 10000)
{
    blockingCollection.Add(count);
    count++;
}

blockingCollection.CompleteAdding();
于 2016-02-25T09:01:46.447 回答
3

这是一个GetConsumingEnumerable方法特性。

如果没有可用的项目或集合为空,则以这种方式枚举集合会阻塞使用者线程。

你可以在这里阅读更多关于它的信息

同样 usingParallel.For(0,5)并不能保证数据将在 5 个单独的线程中处理。这取决于Environment.ProcessorCount.

于 2016-02-25T04:16:11.697 回答
1

另外,当我将 Parallel.For 设置为 5 时,是否意味着它正在 5 个单独的线程上处理数据?

不,引用 SO 中的先前答案(Parallel.For(Foreach) 将创建多少个线程?默认 MaxDegreeOfParallelism?):

任务并行库和 PLINQ 的默认调度程序使用 .NET Framework 线程池来排队和执行工作。在 .NET Framework 4 中,ThreadPool 使用 System.Threading.Tasks.Task 类型提供的信息来有效地支持并行任务和查询通常表示的细粒度并行性(短期工作单元)。

简单地说,TPL 创建任务,而不是线程。框架决定有多少线程应该处理它们。

于 2016-02-25T08:35:04.113 回答