9

我试图理解为什么 Parallel.For 在以下情况下能够胜过许多线程:考虑一批可以并行处理的作业。在处理这些工作时,可能会添加新工作,然后也需要处理这些工作。Parallel.For解决方案如下所示:

var jobs = new List<Job> { firstJob };
int startIdx = 0, endIdx = jobs.Count;
while (startIdx < endIdx) {
  Parallel.For(startIdx, endIdx, i => WorkJob(jobs[i]));
  startIdx = endIdx; endIdx = jobs.Count;
}

这意味着 Parallel.For 需要多次同步。考虑一个面包优先的图算法算法;同步的数量会很大。浪费时间,不是吗?

在老式线程方法中尝试相同的方法:

var queue = new ConcurrentQueue<Job> { firstJob };
var threads = new List<Thread>();
var waitHandle = new AutoResetEvent(false);
int numBusy = 0;
for (int i = 0; i < maxThreads; i++) 
  threads.Add(new Thread(new ThreadStart(delegate {
    while (!queue.IsEmpty || numBusy > 0) {
      if (queue.IsEmpty)
        // numbusy > 0 implies more data may arrive
        waitHandle.WaitOne();

      Job job;
      if (queue.TryDequeue(out job)) {
        Interlocked.Increment(ref numBusy);
        WorkJob(job); // WorkJob does a waitHandle.Set() when more work was found
        Interlocked.Decrement(ref numBusy);
      }
    }
    // others are possibly waiting for us to enable more work which won't happen
    waitHandle.Set(); 
})));
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());

Parallel.For代码当然要干净得多,但我无法理解的是,它也更快!任务调度器就这么好吗?同步被消除了,没有忙碌的等待,但线程方法始终较慢(对我而言)。这是怎么回事?线程方法可以更快吗?

编辑:感谢所有答案,我希望我可以选择多个。我选择了一个也显示出实际可能改进的那个。

4

3 回答 3

14

这两个代码示例并不完全相同。

Parallel.ForEach()使用有限数量的线程并重新使用它们。第二个示例已经开始落后,因为必须创建多个线程。这需要时间。

的价值是maxThreads多少?非常关键,Parallel.ForEach()因为它是动态的。

任务调度器就这么好吗?

这是相当不错的。TPL 使用工作窃取和其他自适应技术。你将很难做得更好。

于 2012-10-25T14:06:30.200 回答
4

Parallel.For 实际上并没有将项目分解为单个工作单元。它根据计划使用的线程数和要执行的迭代次数分解所有工作(早期)。然后让每个线程同步处理该批次(可能使用工作窃取或保存一些额外的项目以在接近尾声时进行负载平衡)。通过使用这种方法,工作线程实际上永远不会互相等待,而由于您在每次迭代之前/之后使用的大量同步,您的线程会不断地互相等待。

最重要的是,由于它使用线程池线程,它需要的许多线程可能已经创建,这是它的另一个优势。

至于同步,Parallel.For 的全部意义在于所有迭代都可以并行完成,因此几乎不需要进行同步(至少在他们的代码中)。

然后当然还有线程数的问题。线程池有很多非常好的算法和启发式方法,可以帮助它根据当前硬件、其他应用程序的负载等确定在那一刻需要多少线程。你可能使用了太多,或没有足够的线程。

此外,由于在开始之前不知道您拥有的项目数量,我建议使用Parallel.ForEach而不是几个Parallel.For循环。它只是为您所处的情况而设计的,因此它的启发式方法会更好地应用。(它也使得代码更简洁。)

BlockingCollection<Job> queue = new BlockingCollection<Job>();

//add jobs to queue, possibly in another thread
//call queue.CompleteAdding() when there are no more jobs to run

Parallel.ForEach(queue.GetConsumingEnumerable(),
    job => job.DoWork());
于 2012-10-25T14:12:48.727 回答
2

您创建了一堆新线程,而 Parallel.For 正在使用线程池。如果您使用 C# 线程池,您会看到更好的性能,但这样做确实没有意义。

我会回避推出您自己的解决方案;如果有需要自定义的特殊情况,请使用 TPL 并自定义..

于 2012-10-25T14:11:02.917 回答