.NET 已经拥有 DataFlow 块形式的 pub/sub 和 worker 机制,最近还有 Channels。
数据流
System.Threading.Tasks.Dataflow命名空间中的数据流块是构建工作人员和工作人员管道的“旧”方式(2012 年及以后)。每个块都有一个输入和/或输出缓冲区。发布到块的每条消息都由后台的一个或多个任务处理。对于具有输出的块,每次迭代的输出都存储在输出缓冲区中。
块可以组合成类似于 CMD 或 Powershell 管道的管道,每个块在其自己的任务上运行。
在最简单的情况下,ActionBlock可以用作工作者:
void ProcessCustomer(Customer customer)
{
....
}
var block =new ActionBlock<Customer>(cust=>ProcessCustomer(cust));
而已。无需手动出队或轮询。
生产者方法可以开始将客户实例发送到块。它们中的每一个都将按照发布的顺序在后台进行处理:
foreach(var customer in bigCustomerList)
{
block.Post(customer);
}
完成后,例如当应用程序终止时,生产者只需调用Complete()
块并等待任何剩余条目完成。
block.Complete();
await block.Completion;
块也可以使用异步方法。
频道
通道是一种新机制,内置于 .NET Core 3 中,可在以前的 .NET Framework 和 .NET Core 版本中作为NuGet使用。生产者使用 ChannelWriter 写入通道,消费者使用 ChannelReader 从通道读取。在您意识到它允许一些强大的模式之前,这可能看起来有点奇怪。
生产者可能是这样的,例如,生产者以 0.5 秒的延迟“生产”列表中的所有客户:
ChannelReader<Customer> Producer(IEnumerable<Customer> customers,CancellationToken token=default)
{
//Create a channel that can buffer an infinite number of entries
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
//Start a background task to produce the data
_ = Task.Run(async ()=>{
foreach(var customer in customers)
{
//Exit gracefully in case of cancellation
if (token.IsCancellationRequested)
{
return;
}
await writer.WriteAsync(customer,token);
await Task.Delay(500);
}
},token)
//Ensure we complete the writer no matter what
.ContinueWith(t=>writer.Complete(t.Exception);
return channel.Reader;
}
这有点复杂,但请注意该函数唯一需要返回的是 ChannelReader。取消令牌对于提前终止生产者很有用,例如在超时之后或应用程序关闭时。
当作者完成时,频道的所有读者也将完成。
消费者只需要 ChannelReader 工作:
async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
while(await reader.WaitToReadAsync(token))
{
while(reader.TryRead(out var customer))
{
//Process the customer
}
}
}
如果编写器完成,WaitToReadAsync
将返回false
并且循环将退出。
在 .NET Core 3 中,ChannelReader 通过 ReadAllAsync 方法支持 IAsyncEnumerable,使代码更加简单:
async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
await foreach(var customer in reader.ReadAllAsync(token))
{
//Process the customer
}
}
生产者创建的阅读器可以直接传递给消费者:
var customers=new []{......}
var reader=Producer(customers);
await Consumer(reader);
中间步骤可以从前一个通道阅读器读取数据并将数据发布到下一个,例如订单生成器:
ChannelReader<Order> ConsumerOrders(ChannelReader<Customer> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
//Start a background task to produce the data
_ = Task.Run(async ()=>{
await foreach(var customer in reader.ReadAllAsync(token))
{
//Somehow create an order for the customer
var order=new Order(...);
await writer.WriteAsync(order,token);
}
},token)
//Ensure we complete the writer no matter what
.ContinueWith(t=>writer.Complete(t.Exception);
return channel.Reader;
}
同样,我们需要做的就是将读者从一种方法传递到另一种方法
var customers=new []{......}
var customerReader=Producer(customers);
var orderReader=CustomerOrders(customerReader);
await ConsumeOrders(orderReader);