我有一个异步任务流,它是通过将异步 lambda 应用于项目流而生成的:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
方法AsyncEnumerable.Range
及Select
以上是从System.Linq.Async
包中提供的。
我想要的结果是一个结果流,表示为IAsyncEnumerable<string>
. 结果必须按照与原始任务相同的顺序进行流式传输。此外,必须限制流的枚举,因此在任何给定时间都不能超过指定数量的任务处于活动状态。
我想要一个类型的扩展方法形式的解决方案IAsyncEnumerable<Task<T>>
,以便我可以多次链接它并形成一个处理管道,在功能上与TPL Dataflow管道相似,但表达流畅。以下是理想扩展方法的签名:
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
也接受CancellationToken
as 参数将是一个不错的功能。
更新:为了完整起见,我提供了一个通过链接两次AwaitResults
方法形成的流畅处理管道的示例。此管道以 PLINQ 块开始,只是为了证明混合 PLINQ 和 Linq.Async 是可能的。
int[] results = await Partitioner
.Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
预期输出:
结果:1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20
注意:回想起来,AwaitResults
方法可能应该命名为Merge
,而concurrencyLevel
参数应该命名为maxConcurrent
,因为它的功能类似于Rx库Merge
中存在的运算符。System.Interactive.Async包确实包含一个名为 s 的运算符,但它的所有重载都不会对源进行操作。它在和来源上运作。还可以添加一个参数,以便明确控制等待/合并操作所需的缓冲区大小。Merge
IAsyncEnumerable<T>
IAsyncEnumerable<Task<T>>
IEnumerable<IAsyncEnumerable<TSource>>
IAsyncEnumerable<IAsyncEnumerable<TSource>>
bufferCapacity