3

我有要处理的批次列表。永远。
我想并行执行每个块(5),并在完成后移至下一个块。
出于某种原因,下面的代码不会等待块完成并继续,即使它没有完成。

while (true)
{
    foreach (string[] urlsArr in chunks)
    { 
        int i = 0;
        foreach (var url in urlsArr)
        {
            ThreadPool.QueueUserWorkItem(x =>
            {
                ProccessUrl(url, config, drivers[i]);
                _resetEvent.Set();
                i++;
            });
        }
        _resetEvent.WaitOne();// this is not really waiting.
    }
}
4

3 回答 3

1

看看Semaphore或它的苗条版本。信号量将允许您始终只运行 5 个线程。一旦这些正在运行的线程中的任何一个完成,它就可以开始新的工作。这更有效,尤其是在工作负载不均匀的情况下。考虑一个项目需要一个小时来处理而其他 4 个需要一秒钟的情况。在这种情况下,4 个线程将等待最后一个线程完成,然后再进行任何其他工作。

有关示例,请参阅需要了解 SemaphoreSlim 的用法

在您的代码中,问题是您只有一个等待句柄和 5 个线程。当 5 个正在运行的线程中的任何一个完成工作时,它将设置等待句柄,从而允许您的外部循环继续进行,从而启动另外 5 个线程。到现在为止,内循环的前 4 个线程可能已经完成,并且它们中的任何一个都可以再次设置等待句柄!现在,你看到这里有问题吗?

根据 Hans 的评论,如果单个批次中的工作项之间存在依赖关系,那么在开始下一批之前必须完成所有工作项,您应该看看CountDownEvent

于 2015-04-06T09:54:37.587 回答
1

这是一个带有 Tasks(async/await) 的版本

while (true)
        {
            foreach (string[] urlsArr in chunks)
            {
                Task[] tasks = new Task[urlsArr.Length];
                for (int i = 0; i < urlsArr.Length; i++)
                {
                    var url = urlsArr[i];
                    var driver = drivers[i];
                    tasks[i] = Task.Run(() => { ProccessUrl(url, config, driver); });
                }

                await Task.WhenAll(tasks);
            }
        }

请注意,它还解决了原始代码中未以线程安全方式递增的“i”变量的问题(可以使用 Interlocked.Increment 修复)。

如果您的代码不是async,您可以等待任务在线程中完成(但这是阻塞的)

Task.WhenAll(tasks).Wait();
于 2015-04-06T10:58:47.310 回答
0

我认为您也许可以简化整个事情,并利用Parallel.ForEach()来管理线程并将并发度限制为 5。

如果您运行以下示例代码,您将看到假冒的 URL 以 5 个为一组进行处理,因为并发线程的数量被限制为 5。

如果你这样做,你将不需要你自己的分块逻辑:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main()
        {
            // Make some pretend URLs for this demo.

            string[] urls = Enumerable.Range(1, 100).Select(n => n.ToString()).ToArray();

            // Use Parallel.ForEach() along with MaxDegreeOfParallelism = 5 to process
            // these using 5 threads maximum:

            Parallel.ForEach(
                urls,
                new ParallelOptions{MaxDegreeOfParallelism = 5},
                processUrl
            );
        }

        static void processUrl(string url)
        {
            Console.WriteLine("Processing " + url);
            Thread.Sleep(1000);
            Console.WriteLine("Processed " + url);
        }
    }
}
于 2015-04-06T10:21:27.437 回答