您的代码试图在生成任何消息之前使用通道中的所有消息。虽然您可以存储生产者/消费者任务而不是等待它们,但最好使用特定于通道的习语和模式。
与其将 Channel 用作某种容器,不如仅将 Readers 公开和共享给消费者创建和拥有的通道。这就是在 Go 中使用 Channels 的方式。
这就是为什么你也只能使用 ChannelReader 和 ChannelWriter 的原因:
- ChannelReader 是
ch ->
Go 中的一个,是从通道读取的唯一方法
- ChannelWriter 是
ch <-
Go 中唯一的写法。
使用自有频道
如果您需要异步处理数据,请在生产者/消费者方法中的任务中执行此操作。这使得控制通道和知道处理何时完成或取消变得容易得多。它还允许您非常轻松地从通道构建管道。
在您的情况下,生产者可能是:
public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(()=>{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await writer.WriteAsync(i, cancellationToken);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
消费者,如果是懒惰的,可以是:
static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
使其成为扩展方法两者都可以与:
await Producer(cancel)
.ConsumeNumbers(cancel);
在更一般的情况下,管道块从通道读取并返回通道:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
这将允许创建一系列步骤,例如:
await Producer(cancel)
.RaiseTo(0.3,cancel)
.RaiseTo(3,cancel)
.ConsumeNumbers(cancel);
并行处理
每个块也可以使用多个任务,以加快处理速度。在 .NET 6 中,这可以通过以下方式轻松完成Parallel.ForEachAsync
:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Parallel.ForEachAsync(
reader.ReadAllAsync(cancellationToken),
cancellationToken,
async item=>
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
})
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
注意顺序
通道保留项目和读取请求的顺序。这意味着单任务步骤将始终按顺序消费和产生消息。虽然没有这样的保证Parallel.ForEachAsync
。如果顺序很重要,您必须添加代码以确保按顺序发出消息,或者尝试通过另一个步骤重新排序它们。