只要您正确使用它们,您就可以使用任意数量。实际上,使用公开处理管道的后台服务(本质上是单例)是在 .NET Core 中使用它们的一种非常常见的方式。
通道不仅仅是异步队列。它们类似于 DataFlow 块——它们可用于创建处理管道,每个块/worker 处理来自输入缓冲区/ChannelReader 的数据并将结果转发到输出缓冲区/ChannelWriter。DataFlow 块通过任务本身处理异步处理。有了通道,我们需要自己处理工作任务。
我们需要记住的一个非常重要的概念是不能直接访问通道。事实上,在几乎所有情况下,它们甚至都不应该作为字段或属性公开。在大多数情况下,只需要一个 ChannelReader。在某些情况下,例如在管道的头部,可能会暴露 ChannelWriter。或不。
单个工人/步骤
典型的工作步骤如下所示
private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyOtherType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var item from reader.ReadAllAsync(token))
{
MyType2 result=........;
await writer.WriteAsync(result);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;
}
需要注意的一些事项:
- 如果您愿意,您可以创建多个任务,并
Task.WhenAll在关闭频道之前等待所有工作人员完成。
- 如果管道不够快,您可以使用有界通道来防止大量消息累积。
- 如果发出取消信号,则从输入通道读取数据和工作任务都将被取消。
- 当worker任务完成时,无论是被取消还是被抛出,通道都会被关闭。
- 当“头”通道完成时,完成将从一个步骤流向下一步。
组合步骤
通过将一个人的输出阅读器传递给另一个人的输入阅读器,可以组合多个步骤,例如:
var cts=new CancelaltionTokenSource();
var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;
CancellationTokenSource 可用于提前结束管道或设置超时以防止管道挂起。
管道头
“头”阅读器可能来自“适配器”方法,例如:
private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
foreach(var item from input)
{
if (token.IsCancellationRequested)
{
break;
}
writer.TryWrite(result);
}
//No-one else is going to complete this channel
channel.Complete();
return channel.Reader;
}
在后台服务的情况下,我们可以使用服务方法将输入“发布”到头部通道,例如:
class MyService
{
Channel<MyType0> _headChannel;
public MyService()
{
_headChannel=Channel.CreateBounded<MyType0>(5);
}
public async Task ExecuteAsync(CancellationToken token)
{
var step1=Step1(_headChannel.Reader,token);
var step2=Step2(step1,token);
await step2.Completion;
}
public Task PostAsync(MyType0 input)
{
return _headChannel.Writer.WriteAsync(input);
}
public Stop()
{
_headChannel.Writer.TryComplete();
}
...
}
我故意使用看起来像BackgroundService 方法名称的方法名称。StartAsync 或 ExecuteAsync 可用于设置管道。StopAsync 可用于表示其完成,例如当最终用户点击Ctrl+时C。
队列 BackgroundService示例中显示的另一个有用技术是注册一个接口,客户端可以使用该接口发布消息,而不是直接访问服务类,例如:
interface IQueuedService<T>
{
Task PostAsync(T input);
}
结合 System.Linq.Async
该ReadAllAsync()方法返回一个IAsyncEnumerable<T>,这意味着我们可以使用System.Linq.Async中的运算符,例如 Where 或 Take 来过滤、批处理或转换消息,例如:
private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var inpStream=reader.ReadAllAsync(token)
.Where(it=>it.IsActive);
await foreach(var item from inpStream)
{
await writer.WriteAsync(item);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;
}