假设我有 100 个任务需要 10 秒。现在我想一次只运行 10 个,比如当这 10 个任务中的 1 个完成时,另一个任务被执行,直到所有任务都完成。
现在我总是使用ThreadPool.QueueUserWorkItem()
这样的任务,但我已经读到这样做是不好的做法,我应该使用任务来代替。
我的问题是我在任何地方都没有找到适合我的场景的好例子,所以你能让我开始了解如何使用 Tasks 实现这个目标吗?
假设我有 100 个任务需要 10 秒。现在我想一次只运行 10 个,比如当这 10 个任务中的 1 个完成时,另一个任务被执行,直到所有任务都完成。
现在我总是使用ThreadPool.QueueUserWorkItem()
这样的任务,但我已经读到这样做是不好的做法,我应该使用任务来代替。
我的问题是我在任何地方都没有找到适合我的场景的好例子,所以你能让我开始了解如何使用 Tasks 实现这个目标吗?
SemaphoreSlim maxThread = new SemaphoreSlim(10);
for (int i = 0; i < 115; i++)
{
maxThread.Wait();
Task.Factory.StartNew(() =>
{
//Your Works
}
, TaskCreationOptions.LongRunning)
.ContinueWith( (task) => maxThread.Release() );
}
TPL Dataflow非常适合做这样的事情。Parallel.Invoke
你可以很容易地创建一个 100% 的异步版本:
async Task ProcessTenAtOnce<T>(IEnumerable<T> items, Func<T, Task> func)
{
ExecutionDataflowBlockOptions edfbo = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
ActionBlock<T> ab = new ActionBlock<T>(func, edfbo);
foreach (T item in items)
{
await ab.SendAsync(item);
}
ab.Complete();
await ab.Completion;
}
你有几个选择。您可以Parallel.Invoke
用于初学者:
public void DoWork(IEnumerable<Action> actions)
{
Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 10 }
, actions.ToArray());
}
这是一个替代选项,它将更加努力地运行恰好 10 个任务(尽管线程池中处理这些任务的线程数可能不同),并且返回一个Task
指示何时完成,而不是阻塞直到完成。
public Task DoWork(IList<Action> actions)
{
List<Task> tasks = new List<Task>();
int numWorkers = 10;
int batchSize = (int)Math.Ceiling(actions.Count / (double)numWorkers);
foreach (var batch in actions.Batch(actions.Count / 10))
{
tasks.Add(Task.Factory.StartNew(() =>
{
foreach (var action in batch)
{
action();
}
}));
}
return Task.WhenAll(tasks);
}
如果您没有 MoreLinq,对于该Batch
功能,这是我更简单的实现:
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
{
List<T> buffer = new List<T>(batchSize);
foreach (T item in source)
{
buffer.Add(item);
if (buffer.Count >= batchSize)
{
yield return buffer;
buffer = new List<T>();
}
}
if (buffer.Count >= 0)
{
yield return buffer;
}
}
I would love to use the simplest solution I can think of which as I think using the TPL:
string[] urls={};
Parallel.ForEach(urls, new ParallelOptions() { MaxDegreeOfParallelism = 2}, url =>
{
//Download the content or do whatever you want with each URL
});
您可以创建这样的方法:
public static async Task RunLimitedNumberAtATime<T>(int numberOfTasksConcurrent,
IEnumerable<T> inputList, Func<T, Task> asyncFunc)
{
Queue<T> inputQueue = new Queue<T>(inputList);
List<Task> runningTasks = new List<Task>(numberOfTasksConcurrent);
for (int i = 0; i < numberOfTasksConcurrent && inputQueue.Count > 0; i++)
runningTasks.Add(asyncFunc(inputQueue.Dequeue()));
while (inputQueue.Count > 0)
{
Task task = await Task.WhenAny(runningTasks);
runningTasks.Remove(task);
runningTasks.Add(asyncFunc(inputQueue.Dequeue()));
}
await Task.WhenAll(runningTasks);
}
然后你可以调用任何异步方法 n 次,限制如下:
Task task = RunLimitedNumberAtATime(10,
Enumerable.Range(1, 100),
async x =>
{
Console.WriteLine($"Starting task {x}");
await Task.Delay(100);
Console.WriteLine($"Finishing task {x}");
});
或者如果你想运行长时间运行的非异步方法,你可以这样做:
Task task = RunLimitedNumberAtATime(10,
Enumerable.Range(1, 100),
x => Task.Factory.StartNew(() => {
Console.WriteLine($"Starting task {x}");
System.Threading.Thread.Sleep(100);
Console.WriteLine($"Finishing task {x}");
}, TaskCreationOptions.LongRunning));
也许在框架的某个地方有类似的方法,但我还没有找到。