考虑一个包含许多需要处理的作业的队列。队列的限制是一次只能获得一份工作,并且无法知道有多少工作。这些作业需要 10 秒才能完成,并且需要大量等待来自 Web 服务的响应,因此不受 CPU 限制。
如果我使用这样的东西
while (true)
{
var job = Queue.PopJob();
if (job == null)
break;
Task.Factory.StartNew(job.Execute);
}
然后它会以比完成它们的速度更快的速度从队列中快速弹出作业,耗尽内存并跌倒在它的屁股上。>.<
我不能使用(我不认为)ParallelOptions.MaxDegreeOfParallelism,因为我不能使用 Parallel.Invoke 或 Parallel.ForEach
我找到了 3 个替代方案
将 Task.Factory.StartNew 替换为
Task task = new Task(job.Execute,TaskCreationOptions.LongRunning) task.Start();
这似乎在一定程度上解决了这个问题,但我不清楚这是在做什么以及这是否是最好的方法。
使用BlockingCollection之类的东西在启动时将作业添加到集合中,并在完成时删除以限制可以运行的数量。
对于#1,我必须相信自动做出正确的决定,#2/#3 我必须计算出可以自己运行的最大任务数。
我是否正确理解了这一点-这是更好的方法,还是有其他方法?
编辑- 这是我从下面的答案中得出的,生产者 - 消费者模式。
以及整体吞吐量目标不是使作业出队的速度超过处理速度,并且没有多个线程轮询队列(此处未显示,但这是一个非阻塞操作,如果从多个地方以高频率轮询将导致巨大的交易成本) .
// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);
// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
Thread consumer = new Thread(() =>
{
while (!jobs.IsCompleted)
{
var job = jobs.Take();
job.Execute();
}
}
consumer.Start();
}
// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
var job = Queue.PopJob();
if (job != null)
jobs.Add(job);
else
{
jobs.CompletedAdding()
// May need to wait for running jobs to finish
break;
}
}