37

考虑一个包含许多需要处理的作业的队列。队列的限制是一次只能获得一份工作,并且无法知道有多少工作。这些作业需要 10 秒才能完成,并且需要大量等待来自 Web 服务的响应,因此不受 CPU 限制。

如果我使用这样的东西

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

然后它会以比完成它们的速度更快的速度从队列中快速弹出作业,耗尽内存并跌倒在它的屁股上。>.<

我不能使用(我不认为)ParallelOptions.MaxDegreeOfParallelism,因为我不能使用 Parallel.Invoke 或 Parallel.ForEach

我找到了 3 个替代方案

  1. 将 Task.Factory.StartNew 替换为

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    这似乎在一定程度上解决了这个问题,但我不清楚这是在做什么以及这是否是最好的方法。

  2. 创建限制并发程度的自定义任务调度程序

  3. 使用BlockingCollection之类的东西在启动时将作业添加到集合中,并在完成时删除以限制可以运行的数量。

对于#1,我必须相信自动做出正确的决定,#2/#3 我必须计算出可以自己运行的最大任务数。

我是否正确理解了这一点-这是更好的方法,还是有其他方法?

编辑- 这是我从下面的答案中得出的,生产者 - 消费者模式。

以及整体吞吐量目标不是使作业出队的速度超过处理速度,并且没有多个线程轮询队列(此处未显示,但这是一个非阻塞操作,如果从多个地方以高频率轮询将导致巨大的交易成本) .

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}
4

6 回答 6

22

我刚刚给出了一个非常适用于这个问题的答案。

基本上,TPL Task 类用于调度 CPU 密集型工作。它不是为阻止工作而设计的。

您正在使用不是 CPU 的资源:等待服务回复。这意味着 TPL 将错误地管理您的资源,因为它在一定程度上假定 CPU 有界性。

自己管理资源:启动固定数量的线程或LongRunning任务(基本相同)。根据经验确定线程数。

您不能将不可靠的系统投入生产。出于这个原因,我推荐 #1 但节流。不要创建与工作项一样多的线程。创建使远程服务饱和所需的尽可能多的线程。为自己编写一个辅助函数,它产生 N 个线程并使用它们来处理 M 个工作项。通过这种方式,您可以获得完全可预测且可靠的结果。

于 2012-06-21T13:43:00.947 回答
12

稍后在您的代码或 3rd 方库中由 , 引起的潜在流拆分和延续await不会很好地与长时间运行的任务(或线程)一起使用,因此不要打扰使用长时间运行的任务。在这个async/await世界上,它们是无用的。更多细节在这里

您可以调用ThreadPool.SetMaxThreads,但在进行此调用之前,请确保使用 设置最小线程数ThreadPool.SetMinThreads,使用低于或等于最大值的值。顺便说一句,MSDN 文档是错误的。通过这些方法调用,您可以低于机器上的内核数量,至少在 .NET 4.5 和 4.6 中,我使用这种技术来降低内存有限的 32 位服务的处理能力。

但是,如果您不想限制整个应用程序,而只是限制它的处理部分,则自定义任务调度程序将完成这项工作。很久以前,MS 发布了带有几个自定义任务调度程序的示例,包括一个LimitedConcurrencyLevelTaskScheduler. 使用 手动生成主处理任务Task.Factory.StartNew,提供自定义任务调度程序,由它生成的所有其他任务都将使用它,包括async/await甚至Task.Yield用于在async方法的早期实现异步。

但是对于您的特定情况,两种解决方案都不会在完成之前停止耗尽您的工作队列。这可能是不可取的,具体取决于您的队列的实现和目的。它们更像是“触发一堆任务并让调度程序找到时间执行它们”类型的解决方案。因此,也许这里更合适的东西可能是一种更严格的方法来控制作业的执行semaphores。代码如下所示:

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}

给猫剥皮的方法不止一种。使用你认为合适的东西。

于 2016-03-19T19:28:10.030 回答
8

Microsoft 有一个非常酷的库,称为 DataFlow,它完全可以满足您的需求(以及更多)。详情在这里

您应该使用 ActionBlock 类并设置 ExecutionDataflowBlockOptions 对象的 MaxDegreeOfParallelism。ActionBlock 与 async/await 配合得很好,因此即使在等待外部调用时,也不会开始处理新作业。

ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
            actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)
于 2015-01-23T07:02:35.383 回答
7

这里的问题似乎不是running太多,而是 scheduleTask太多。您的代码将尝试调度尽可能多的 s,无论它们执行的速度有多快。如果你有太多的工作,这意味着你会得到OOM。 TaskTask

因此,您提出的任何解决方案都不会真正解决您的问题。如果看起来简单地指定LongRunning可以解决您的问题,那么这很可能是因为创建一个新的Thread(这就是什么LongRunning)需要一些时间,这有效地限制了获得新工作。因此,此解决方案只能在偶然情况下起作用,并且很可能会在以后导致其他问题。

关于解决方案,我大多同意 usr:运行良好的最简单的解决方案是创建固定数量的LongRunning任务,并有一个循环调用(如果该方法不是线程安全的,Queue.PopJob()则由 a 保护)并完成工作。lockExecute()

更新:经过更多思考,我意识到以下尝试很可能会表现得很糟糕。仅当您确实确定它对您有用时才使用它。


但是 TPL 试图找出最好的并行度,即使对于 IO-bound 也是如此Task。因此,您可能会尝试利用它来发挥自己的优势。Long Tasks 在这里是行不通的,因为从 TPL 的角度来看,似乎没有做任何工作,它会Task一遍又一遍地启动 new s。相反,您可以做的是Task在每个Task. 这样,TPL 就会知道发生了什么,并且它的算法可能运行良好。此外,为了让 TPL 决定并行度,Task在其行中第一个的 a 的开头,开始另一行Tasks。

该算法可能运行良好。但也有可能 TPL 会在并行度方面做出错误的决定,我实际上还没有尝试过这样的事情。

在代码中,它看起来像这样:

void ProcessJobs(bool isFirst)
{
    var job = Queue.PopJob(); // assumes PopJob() is thread-safe
    if (job == null)
        return;

    if (isFirst)
        Task.Factory.StartNew(() => ProcessJobs(true));

    job.Execute();

    Task.Factory.StartNew(() => ProcessJob(false));
}

然后开始

Task.Factory.StartNew(() => ProcessJobs(true));
于 2012-06-21T13:59:26.887 回答
1

TaskCreationOptions.LongRunning对于阻止任务很有用,在这里使用它是合法的。它的作用是建议调度程序将一个线程专用于该任务。调度程序本身尝试将线程数保持在与 CPU 内核数相同的级别,以避免过多的上下文切换。

Joseph Albahari 在 C#中的线程中有很好的描述

于 2012-06-21T15:56:07.043 回答
1

我使用消息队列/邮箱机制来实现这一点。它类似于演员模型。我有一个有邮箱的类。我称这个班级为我的“工人”。它可以接收消息。这些消息是排队的,它们本质上定义了我希望工作人员运行的任务。工作人员将使用 Task.Wait() 来完成其任务,然后再将下一条消息出列并开始下一个任务。

通过限制我拥有的工人数量,我可以限制正在运行的并发线程/任务的数量。

这在我关于分布式计算引擎的博客文章中通过源代码进行了概述。如果您查看 IActor 和 WorkerNode 的代码,我希望它是有意义的。

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/

于 2016-09-08T20:23:55.277 回答