1

假设我有很多生产者,1 个消费者未绑定 Channel,有一个消费者:

await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
}

问题是该consume函数会进行一些 IO 访问,并且可能还会进行一些网络访问,因此在消耗 1 条消息之前可能会产生更多消息。但是由于IO资源不能并发访问,所以不能有很多消费者,也不能把consume函数扔到Task中就忘掉了。

consume功能可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有办法让消费者在尝试访问通道队列时获取通道队列中的所有消息,如下所示:

while (true) {
    Message[] messages = await channel.Reader.TakeAll();
    await consumeAll(messages);
}

编辑:我能想到的 1 个选项是:

List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
    Message msg;
    while (channel.Reader.TryRead(out msg))
        messages.Add(msg);
    if (messages.Count > 0)
    {
        await consumeAll(messages);
        messages.Clear();
    }
}

但我觉得应该是一个更好的方法来做到这一点。

4

2 回答 2

3

在阅读了 Stephen Toub 的有关 channels 的入门书后,我尝试编写一个扩展方法来满足您的需求(我已经有一段时间没有做任何 C# 了,所以这很有趣)。

public static class ChannelReaderEx
{
    public static async IAsyncEnumerable<IEnumerable<T>> ReadBatchesAsync<T>(
        this ChannelReader<T> reader, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default
    )
    {
        while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
        {
            yield return reader.Flush().ToList();
        }
    }

    public static IEnumerable<T> Flush<T>(this ChannelReader<T> reader)
    {
        while (reader.TryRead(out T item))
        {
            yield return item;
        }
    }
}

可以这样使用:

await foreach (var batch in channel.Reader.ReadBatchesAsync())
{
    await ConsumeBatch(batch);
}
于 2022-01-13T14:40:22.703 回答
1

在层面上解决这个问题ChannelReader<T>,就像在优秀支出者的答案中一样,是实用且足够的,但在IAsyncEnumerable<T>层面上解决它可能是具有更广泛应用范围的解决方案。下面是异步序列的扩展方法BufferImmediate,它产生非空缓冲区,其中包含在拉取序列时立即可用的所有元素:

/// <summary>
/// Splits the elements of a sequence into chunks that contain all the elements
/// that are immediately available.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> BufferImmediate<TSource>(
    this IAsyncEnumerable<TSource> source, int maxSize = Int32.MaxValue)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
    return Implementation();

    async IAsyncEnumerable<IList<TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ValueTask<bool> moveNext = default;
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        try
        {
            moveNext = enumerator.MoveNextAsync();
            var buffer = new List<TSource>();
            ExceptionDispatchInfo error = null;
            while (true)
            {
                if ((!moveNext.IsCompleted && buffer.Count > 0)
                    || buffer.Count >= maxSize)
                {
                    yield return buffer.ToArray();
                    buffer.Clear();
                }
                else
                {
                    // Await a copy, to prevent a second await on finally.
                    var moveNextCopy = moveNext;
                    moveNext = default;
                    bool moved;
                    try { moved = await moveNextCopy.ConfigureAwait(false); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                    if (!moved) break;
                    buffer.Add(enumerator.Current);
                    try { moveNext = enumerator.MoveNextAsync(); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                }
            }
            if (buffer.Count > 0) yield return buffer.ToArray();
            error?.Throw();
        }
        finally
        {
            // The finally runs when an enumerator created by this method is disposed.
            // Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
            // Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
            try { await moveNext.ConfigureAwait(false); } catch { }
            await enumerator.DisposeAsync().ConfigureAwait(false);
        }
    }
}

使用示例:

await foreach (var batch in channel.Reader.ReadAllAsync().BufferImmediate())
{
    await ConsumeBatch(batch);
}

上述实现是非破坏性的,这意味着从源序列中消耗的任何元素都不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。

于 2022-01-13T23:14:40.683 回答