如果可能的话,我想为并行启动的任务创建一个异步枚举器。所以首先完成的是枚举的第一个元素,第二个完成的是枚举的第二个元素,依此类推。
public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)
{
// ...
}
我敢打赌,有一种使用ContinueWith
和 a的方法Queue<T>
,但我并不完全相信自己可以实现它。
如果可能的话,我想为并行启动的任务创建一个异步枚举器。所以首先完成的是枚举的第一个元素,第二个完成的是枚举的第二个元素,依此类推。
public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)
{
// ...
}
我敢打赌,有一种使用ContinueWith
和 a的方法Queue<T>
,但我并不完全相信自己可以实现它。
这是你要找的吗?
public static async IAsyncEnumerable<T> ParallelEnumerateAsync<T>(
this IEnumerable<Task<T>> tasks)
{
var remaining = new List<Task<T>>(tasks);
while (remaining.Count != 0)
{
var task = await Task.WhenAny(remaining);
remaining.Remove(task);
yield return (await task);
}
}
如果我正确理解您的问题,您的重点是启动所有任务,让它们全部并行运行,但确保返回值的处理顺序与启动任务的顺序相同。
查看规范,使用C# 8.0 异步流任务排队以进行并行执行,但顺序返回可能如下所示。
/// Demonstrates Parallel Execution - Sequential Results with test tasks
async Task RunAsyncStreams()
{
await foreach (var n in RunAndPreserveOrderAsync(GenerateTasks(6)))
{
Console.WriteLine($"#{n} is returned");
}
}
/// Returns an enumerator that will produce a number of test tasks running
/// for a random time.
IEnumerable<Task<int>> GenerateTasks(int count)
{
return Enumerable.Range(1, count).Select(async n =>
{
await Task.Delay(new Random().Next(100, 1000));
Console.WriteLine($"#{n} is complete");
return n;
});
}
/// Launches all tasks in order of enumeration, then waits for the results
/// in the same order: Parallel Execution - Sequential Results.
async IAsyncEnumerable<T> RunAndPreserveOrderAsync<T>(IEnumerable<Task<T>> tasks)
{
var queue = new Queue<Task<T>>(tasks);
while (queue.Count > 0) yield return await queue.Dequeue();
}
可能的输出:
#5 is complete
#1 is complete
#1 is returned
#3 is complete
#6 is complete
#2 is complete
#2 is returned
#3 is returned
#4 is complete
#4 is returned
#5 is returned
#6 is returned
实际上,这种模式似乎没有任何新的语言级支持,而且由于异步流处理IAsyncEnumerable<T>
,这意味着基Task
在此处不起作用,并且所有工作async
方法都应该具有相同的Task<T>
返回类型,这在一定程度上限制了基于异步流的设计。
因此,根据您的情况(您是否希望能够取消长时间运行的任务?是否需要按任务处理异常?是否应该限制并发任务的数量?)检查一下可能是有意义的@TheGeneral 的建议。
更新:
请注意,RunAndPreserveOrderAsync<T>
不一定必须使用 a Queue
of 任务 - 选择这只是为了更好地显示编码意图。
var queue = new Queue<Task<T>>(tasks);
while (queue.Count > 0) yield return await queue.Dequeue();
将枚举数转换为List
会产生相同的结果;的主体RunAndPreserveOrderAsync<T>
可以在这里用一行替换
foreach(var task in tasks.ToList()) yield return await task;
在这个实现中,重要的是首先生成和启动所有任务,这与Queue
初始化或将tasks
enumerable 转换为List
. 但是,可能很难抗拒foreach
像这样简化上面的行
foreach(var task in tasks) yield return await task;
这将导致任务按顺序执行而不是并行运行。
我对这项任务的看法。从本主题的其他答案中大量借鉴,但(希望)有一些增强。所以想法是启动任务并将它们放入队列中,与其他答案相同,但像 Theodor Zoulias 一样,我也在尝试限制最大并行度。但是,我试图克服他在评论中提到的限制,方法是在之前的任何任务完成后立即使用任务延续来排队下一个任务。这样,我们当然可以在配置的限制内最大化同时运行的任务的数量。
我不是异步专家,这个解决方案可能有多线程死锁和其他 Heisenbugs,我没有测试异常处理等,所以你已经被警告了。
public static async IAsyncEnumerable<TResult> ExecuteParallelAsync<TResult>(IEnumerable<Task<TResult>> coldTasks, int degreeOfParallelism)
{
if (degreeOfParallelism < 1)
throw new ArgumentOutOfRangeException(nameof(degreeOfParallelism));
if (coldTasks is ICollection<Task<TResult>>) throw new ArgumentException(
"The enumerable should not be materialized.", nameof(coldTasks));
var queue = new ConcurrentQueue<Task<TResult>>();
using var enumerator = coldTasks.GetEnumerator();
for (var index = 0; index < degreeOfParallelism && EnqueueNextTask(); index++) ;
while (queue.TryDequeue(out var nextTask)) yield return await nextTask;
bool EnqueueNextTask()
{
lock (enumerator)
{
if (!enumerator.MoveNext()) return false;
var nextTask = enumerator.Current
.ContinueWith(t =>
{
EnqueueNextTask();
return t.Result;
});
queue.Enqueue(nextTask);
return true;
}
}
}
我们使用这种方法来生成测试任务(借用 DK 的回答):
IEnumerable<Task<int>> GenerateTasks(int count)
{
return Enumerable.Range(1, count).Select(async n =>
{
Console.WriteLine($"#{n} started");
await Task.Delay(new Random().Next(100, 1000));
Console.WriteLine($"#{n} completed");
return n;
});
}
还有他(或她)的测试跑步者:
async void Main()
{
await foreach (var n in ExecuteParallelAsync(GenerateTasks(9),3))
{
Console.WriteLine($"#{n} returned");
}
}
我们在 LinqPad 中得到了这个结果(这太棒了,顺便说一句)
#1 started
#2 started
#3 started
#3 is complete
#4 started
#2 is complete
#5 started
#1 is complete
#6 started
#1 is returned
#2 is returned
#3 is returned
#4 is complete
#7 started
#4 is returned
#6 is complete
#8 started
#7 is complete
#9 started
#8 is complete
#5 is complete
#5 is returned
#6 is returned
#7 is returned
#8 is returned
#9 is complete
#9 is returned
请注意下一个任务是如何在任何先前任务完成后立即开始的,以及它们返回的顺序如何仍然保留。
如果您想采用异步流(IAsyncEnumerable)并Select
并行运行,那么第一个完成的就是第一个出来的:
/// <summary>
/// Runs the selectors in parallel and yields in completion order
/// </summary>
public static async IAsyncEnumerable<TOut> SelectParallel<TIn, TOut>(
this IAsyncEnumerable<TIn> source,
Func<TIn, Task<TOut>> selector)
{
if (source == null)
{
throw new InvalidOperationException("Source is null");
}
var enumerator = source.GetAsyncEnumerator();
var sourceFinished = false;
var tasks = new HashSet<Task<TOut>>();
Task<bool> sourceMoveTask = null;
Task<Task<TOut>> pipeCompletionTask = null;
try
{
while (!sourceFinished || tasks.Any())
{
if (sourceMoveTask == null && !sourceFinished)
{
sourceMoveTask = enumerator.MoveNextAsync().AsTask();
}
if (pipeCompletionTask == null && tasks.Any())
{
pipeCompletionTask = Task.WhenAny<TOut>(tasks);
}
var coreTasks = new Task[] { pipeCompletionTask, sourceMoveTask }
.Where(t => t != null)
.ToList();
if (!coreTasks.Any())
{
break;
}
await Task.WhenAny(coreTasks);
if (sourceMoveTask != null && sourceMoveTask.IsCompleted)
{
sourceFinished = !sourceMoveTask.Result;
if (!sourceFinished)
{
try
{
tasks.Add(selector(enumerator.Current));
}
catch { }
}
sourceMoveTask = null;
}
if (pipeCompletionTask != null && pipeCompletionTask.IsCompleted)
{
var completedTask = pipeCompletionTask.Result;
if (completedTask.IsCompletedSuccessfully)
{
yield return completedTask.Result;
}
tasks.Remove(completedTask);
pipeCompletionTask = null;
}
}
}
finally
{
await enumerator.DisposeAsync();
}
}
可以像下面这样使用:
static async Task Main(string[] args)
{
var source = GetIds();
var strs = source.SelectParallel(Map);
await foreach (var str in strs)
{
Console.WriteLine(str);
}
}
static async IAsyncEnumerable<int> GetIds()
{
foreach (var i in Enumerable.Range(1, 20))
{
await Task.Delay(200);
yield return i;
}
}
static async Task<string> Map(int id)
{
await Task.Delay(rnd.Next(1000, 2000));
return $"{id}_{Thread.CurrentThread.ManagedThreadId}";
}
可能的输出:
[6:31:03 PM] 1_5
[6:31:03 PM] 2_6
[6:31:04 PM] 3_6
[6:31:04 PM] 6_4
[6:31:04 PM] 5_4
[6:31:04 PM] 4_5
[6:31:05 PM] 8_6
[6:31:05 PM] 7_6
[6:31:05 PM] 11_6
[6:31:05 PM] 10_4
[6:31:05 PM] 9_6
[6:31:06 PM] 14_6
[6:31:06 PM] 12_4
[6:31:06 PM] 13_4
[6:31:06 PM] 15_4
[6:31:07 PM] 17_4
[6:31:07 PM] 20_4
[6:31:07 PM] 16_6
[6:31:07 PM] 18_6
[6:31:08 PM] 19_6