7

好的,这是我的问题。我想启动线程直到某个数量。假设是 100。所以它将开始启动线程并连续检查正在运行的线程数。当达到最大数量时,它将停止启动新线程。但是如果有适当的检查间隔或完成的线程将发出信号,它将启动新线程。

通过这种方式,我将始终拥有一定数量的运行线程。

我通过使用睡眠和永久时间来解决这个问题。所以我不断检查给定时间间隔内的总运行线程数,如果线程完成,则处理它并启动一个新线程。

但是我的解决方案并不是正确的方法。我想如果完成的线程会发出信号,然后如果我们低于最大线程数阈值,检查器会启动一个新线程会更好。

我看到了许多线程池示例,但其中大多数不包含任何具有最大运行线程数的队列池。我的意思是,他们只是继续启动线程,直到完成。但是可以说我有 500k 的网址可以收获。我不能只在带有线程池的 for 循环中启动所有这些。

平台是 c# 4.5 WPF 应用程序

下面是我的解决方案。其实我正在寻找一个更好的。不改进这一点。

private void Button_Click_4(object sender, RoutedEventArgs e)
{
    Task.Factory.StartNew(() =>
    {
        startCrawler();
    });
}

void startCrawler()
{
    int irMaximumThreadcount = 100;
    List<Task> lstStartedThreads = new List<Task>();
    while (true)
    {
        for (int i = 0; i < lstStartedThreads.Count; i++)
        {
            if (lstStartedThreads[i].IsCompleted == true)
            {
                lstStartedThreads[i].Dispose();
                lstStartedThreads.RemoveAt(i);
            }
        }

        if (lstStartedThreads.Count < irMaximumThreadcount)
        {
            var vrTask = Task.Factory.StartNew(() =>
            {
                func_myTask();
            });
            lstStartedThreads.Add(vrTask);
        }

        System.Threading.Thread.Sleep(50);
    }
}

void func_myTask()
{

}
4

5 回答 5

6

我个人会为此使用PLINQ,特别是WithDegreeOfParallelism方法,它将并发执行的数量限制为传入的值。

private IEnumerable<Action> InfiniteFunctions()
{
    while(true)
    {
        yield return func_myTask;
    }
}

private void Button_Click_4(object sender, RoutedEventArgs e)
{
    int irMaximumThreadcount = 100;
    InfiniteFunctions()
        .AsParallel()
        .WithDegreeOfParallelism(irMaximumThreadcount)
        .ForAll(f => f());
}

编辑:实际上阅读文档,似乎 irMaximumThreadCount 最多只能为 64,因此请注意这一点。

编辑 2:好的,看起来更好,它似乎Parallel.ForEach需要一个ParallelOptions参数,其中包含一个MaxDegreeOfParallelism不受限制的属性 -检查出来。所以你的代码可能是这样的:

private void CrawlWebsite(string url)
{
    //Implementation here
}

private void Button_Click_4(object sender, RoutedEventArgs e)
{
    var options = new ParallelOptions() 
    { 
        MaxDegreeOfParallelism = 2000 
    };

    Parallel.ForEach(massiveListOfUrls, options, CrawlWebsite);
}
于 2013-03-03T02:19:57.797 回答
4

您正在将任务与线程混合在一起。任务不是线程。不能保证每个任务都有自己的线程

实际上 TPL(任务并行库)是某种队列。这意味着您可以为您拥有的每个FuncAction对象创建和启动任务。没有简单的方法来控制实际创建的线程数。

但是,您可以以很少的开销创建许多任务,因为 TPL 会将它们排入队列并应用进一步的逻辑来平衡线程池线程上的工作。

如果某些任务需要一个接一个地执行,您可以使用Task.ContinueWith它们来排队。也可以使用Task.Factory.ContinueWhenAny或开始新任务Task.Factory.ContinueWhenAll

这也是如何控制要创建的并行任务数量的线索:只需创建所需数量的任务并使用ContinueWhenAny. 每次任务结束时,下一个任务就会开始。

同样:TPL 将平衡线程池中线程之间的工作。无论如何,您需要考虑的是其他资源的使用,例如磁盘 I/O 或 Internet 连接。有很多任务同时尝试使用相同的资源会大大减慢你的程序。

于 2013-03-03T03:14:51.647 回答
1

.NET 4.0 引入了几个具有内置并发管理的集合,它们应该非常适合这种情况。阻塞收集将比在 while 循环中休眠更有效。然后,您只需生成从阻塞队列中读取的 x 个线程。

BlockingCollection<string> queue = new BlockingCollection<string>(listOfUrls);

for (int x=0; x < MaxThreads; x++)
{
    Task.Factory.StartNew(() => 
    {
        while (true)
        {
            string url = queue.Take(); // blocks until url is available
            // process url;
        }
    }, TaskCreationOptions.LongRunning);
}

您将任务标记为长时间运行,因此它将创建自己的线程而不是使用线程池。如果您需要先进先出,您可以将 a 传递ConcurrentQueue<T>给阻塞集合构造函数。http://msdn.microsoft.com/en-us/library/dd287085.aspx

于 2013-03-03T03:09:28.097 回答
0

不是一个确切的答案,但我认为这可能会引导您朝着正确的方向前进。

首先,看一下Thread.Join,尤其是本页底部给出的简单示例。这种方法优于 Thread.Sleep() 并且更适合您的目的。我正在考虑 * Join *ing the "manager" thread 而不是 * Sleep *ing。

第二个可能适合也可能不适合您的目的的选项是新Tasks库。由于您使用的是最新版本的框架,因此此选项可用,但我猜您无法控制 Tasks 库创建的实际线程数。它会根据底层调度程序自动选择该值。但是,有一个名为ParallelOptions.MaxDegreeOfParallelism的选项听起来很有趣。

于 2013-03-03T02:20:29.823 回答
0

您可以自己管理任务/线程池并等待任何线程完成并立即启动一个新线程。

MAX_THREAD_ALLOWED = 100;
List<Task> tasks = new List<Task>();
for (int i = 0; i < 1000; i++)
{
    tasks.Add(Task.Run(() => { Foo(i); }));
    if (i == MAX_THREAD_ALLOWED)
    {
        Task.WaitAny(tasks.ToArray());
        MAX_THREAD_ALLOWED++;
    }
}
于 2019-05-15T09:40:04.273 回答