2

我有一个异步任务流,它是通过将异步 lambda 应用于项目流而生成的:

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

方法AsyncEnumerable.RangeSelect以上是从System.Linq.Async包中提供的。

我想要的结果是一个结果流,表示为IAsyncEnumerable<string>. 结果必须按照与原始任务相同的顺序进行流式传输。此外,必须限制流的枚举,因此在任何给定时间都不能超过指定数量的任务处于活动状态。

我想要一个类型的扩展方法形式的解决方案IAsyncEnumerable<Task<T>>,以便我可以多次链接它并形成一个处理管道,在功能上与TPL Dataflow管道相似,但表达流畅。以下是理想扩展方法的签名:

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel);

也接受CancellationTokenas 参数将是一个不错的功能。


更新:为了完整起见,我提供了一个通过链接两次AwaitResults方法形成的流畅处理管道的示例。此管道以 PLINQ 块开始,只是为了证明混合 PLINQ 和 Linq.Async 是可能的。

int[] results = await Partitioner
    .Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(2)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // Simulate some CPU-bound operation
        return x;
    })
    .ToAsyncEnumerable()
    .Select(async x =>
    {
        await Task.Delay(300); // Simulate some I/O operation
        return x;
    })
    .AwaitResults(concurrencyLevel: 5)
    .Select(x => Task.Run(() =>
    {
        Thread.Sleep(100); // Simulate another CPU-bound operation
        return x;
    }))
    .AwaitResults(concurrencyLevel: 2)
    .ToArrayAsync();

Console.WriteLine($"Results: {String.Join(", ", results)}");

预期输出:

结果:1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20


注意:回想起来,AwaitResults方法可能应该命名为Merge,而concurrencyLevel参数应该命名为maxConcurrent,因为它的功能类似于RxMerge中存在的运算符。System.Interactive.Async包确实包含一个名为 s 的运算符,它的所有重载都不会对源进行操作。它在和来源上运作。还可以添加一个参数,以便明确控制等待/合并操作所需的缓冲区大小。MergeIAsyncEnumerable<T>IAsyncEnumerable<Task<T>>IEnumerable<IAsyncEnumerable<TSource>>IAsyncEnumerable<IAsyncEnumerable<TSource>>bufferCapacity

4

1 回答 1

3

这是我对该AwaitResults方法的实现。它基于SemaphoreSlim用于控制并发级别的 以及Channel<Task<TResult>>用作异步队列的 。源的枚举IAsyncEnumerable<Task<TResult>>发生在一个即发即弃的任务(馈线)中,它将热任务推送到通道。它还为每个任务附加了一个延续,其中释放了信号量。

该方法的最后一部分是 yielding 循环,其中任务从通道中逐个出列,然后按顺序等待。这样,结果的产生顺序与源流中的任务相同。

此实现要求每个任务等待两次,这意味着它不能用于 type 的源IAsyncEnumerable<ValueTask<TResult>>,因为 aValueTask 只能等待一次

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel = 1,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (concurrencyLevel < 1)
        throw new ArgumentOutOfRangeException(nameof(concurrencyLevel));

    var semaphore = new SemaphoreSlim(concurrencyLevel - 1);
    var channelCapacity = Math.Max(1000, concurrencyLevel * 10);
    var tasksChannel = Channel.CreateBounded<Task<TResult>>(channelCapacity);
    var completionCts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);

    // Feeder task: fire and forget
    _ = Task.Run(async () =>
    {
        try
        {
            await foreach (var task in source
                .WithCancellation(completionCts.Token).ConfigureAwait(false))
            {
                HandleTaskCompletion(task);
                await tasksChannel.Writer.WriteAsync(task, completionCts.Token)
                    .ConfigureAwait(false);
                await semaphore.WaitAsync(completionCts.Token)
                    .ConfigureAwait(false); // Acquire before MoveNextAsync
            }
            tasksChannel.Writer.Complete();
        }
        catch (Exception ex)
        {
            tasksChannel.Writer.Complete(ex);
        }
    });

    async void HandleTaskCompletion(Task task)
    {
        try
        {
            await task.ConfigureAwait(false);
        }
        catch
        {
            // Ignore exceptions here
        }
        finally
        {
            semaphore.Release();
        }
    }

    try
    {
        while (await tasksChannel.Reader.WaitToReadAsync(cancellationToken)
            .ConfigureAwait(false))
        {
            while (tasksChannel.Reader.TryRead(out var task))
            {
                yield return await task.ConfigureAwait(false);
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    finally // Happens when the caller disposes the output enumerator
    {
        completionCts.Cancel();
    }
}

一个重要的细节是最终产生循环周围的 try-finally 块。这对于方法调用者过早放弃对结果流的枚举的情况是必需的。在这种情况下,源流的枚举也应该终止,并且该终止使用CancellationTokenSource. 没有它,feeder 任务永远不会完成,对象永远不会被垃圾收集,内存也会泄漏。

注意:取消cancellationToken可能不会立即取消整个操作。为了获得最大的响应能力,cancellationToken应该使用相同的方法来取消单个任务。

于 2020-02-24T11:41:04.890 回答