假设我有很多生产者,1 个消费者未绑定 Channel,有一个消费者:
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
}
问题是该consume
函数会进行一些 IO 访问,并且可能还会进行一些网络访问,因此在消耗 1 条消息之前可能会产生更多消息。但是由于IO资源不能并发访问,所以不能有很多消费者,也不能把consume
函数扔到Task中就忘掉了。
该consume
功能可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有办法让消费者在尝试访问通道队列时获取通道队列中的所有消息,如下所示:
while (true) {
Message[] messages = await channel.Reader.TakeAll();
await consumeAll(messages);
}
编辑:我能想到的 1 个选项是:
List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
Message msg;
while (channel.Reader.TryRead(out msg))
messages.Add(msg);
if (messages.Count > 0)
{
await consumeAll(messages);
messages.Clear();
}
}
但我觉得应该是一个更好的方法来做到这一点。