3

我有一个返回异步枚举器的方法

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        foreach (var item in ListOfWorkItems)
        {
            yield return DoWork(item);
        }
    }

和来电者:

    public async Task LogResultsAsync()
    {
        await foreach (var result in DoWorkAsync())
        {
            Console.WriteLine(result);
        }
    }

因为DoWork这是一项昂贵的操作,我更喜欢以某种方式并行化它,所以它的工作方式类似于:

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        Parallel.ForEach(ListOfWorkItems, item =>
        {
            yield return DoWork(item);
        });
    }

但是我不能从内部进行收益回报,Parallel.Foreach所以只是想知道最好的方法是什么?

返回结果的顺序无关紧要。

谢谢。

编辑:对不起,我遗漏了一些代码DoWorkAsync,它确实在等待我没有把它放在上面的代码中的东西,因为这与问题不太相关。现已更新

Edit2: DoWork在我的情况下,主要是 I/O 绑定,它从数据库中读取数据。

4

2 回答 2

3

这是一个使用TransformBlock来自TPL 数据流库的基本实现:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
    // Define the dataflow block
    var block = new TransformBlock<IWorkItem, IResult>(async item =>
    {
        return await TransformAsync(item);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10, // the default is 1
        EnsureOrdered = false // the default is true
    });

    // Feed the block with input data
    foreach (var item in workItems)
    {
        block.Post(item);
    }
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var result))
        {
            yield return result;
        }
    }

    // Propagate possible exceptions
    await block.Completion;
}

这种实现并不完美,因为万一消费者IAsyncEnumerable过早地放弃枚举,TransformBlock它将继续在后台工作,直到所有工作项都被处理完。它也不支持取消,所有可敬IAsyncEnumerable的生产方法都应该支持。这些缺失的功能可以相对容易地添加。如果您有兴趣添加它们,请查看问题。

于 2020-08-13T10:32:42.773 回答
2

正如 canton7 所建议的,您可以使用AsParallel而不是Parallel.ForEach.

这可以在标准foreach循环中使用,您可以在其中产生结果:

public async IAsyncEnumerable<IResult> DoWorkAsync()
{
    await Something();
    foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
    {
        yield return result;
    }
}

正如 Theodor Zoulias 所提到的,返回的可枚举实际上根本不是异步的。

如果您只需要使用 this 来使用await foreach它应该不是问题,但更明确地说,您可以返回IEnumerable并让调用者并行化它:

public async Task<IEnumerable<Item>> DoWorkAsync()
{
    await Something();
    return ListOfWorkItems;
}

// Caller...
Parallel.ForEach(await DoWorkAsync(), item => 
{
    var result = DoWork(item);
    //...
});

虽然如果需要在多个地方调用它可能不太容易维护

于 2020-08-13T09:02:50.020 回答