一种解决方案是使用BufferBlock<T>
来自
System.Threading.Tasks.Dataflow(包含在 .net core 3+ 中)。它不使用GetConsumingEnumerable()
,但它仍然允许您使用相同的实用程序,主要是:
- 允许使用多个(对称和/或不对称)消费者和生产者进行并行处理
- 线程安全(允许上述情况) - 无需担心竞争条件
- 可以通过取消令牌和/或收集完成来取消
- 消费者阻塞直到数据可用,避免在轮询时浪费 CPU 周期
还有一个BatchBlock<T>
, 但这将您限制为固定大小的批次。
var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
//process items
}
这是一个工作示例,它演示了以下内容:
- 并行处理可变长度批次的多个对称消费者
- 多个对称生产者(在此示例中并非真正并行运行)
- 生产者完成后完成收集的能力
- 为了使示例简短,我没有演示如何使用
CancellationToken
- 能够等到生产者和/或消费者完成
- 从不允许异步的区域调用的能力,例如构造函数
Thread.Sleep()
调用不是必需的,但有助于模拟在更繁重的场景中会发生的一些处理时间
- the
Task.WaitAll()
和 theThread.Sleep()
都可以选择转换为它们的异步等效项
- 无需使用任何外部库
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
static class Program
{
static void Main()
{
var buffer = new BufferBlock<string>();
// Kick off consumer task(s)
List<Task> consumers = new List<Task>();
for (int i = 0; i < 3; i++)
{
consumers.Add(Task.Factory.StartNew(async () =>
{
// need to copy this due to lambda variable capture
var num = i;
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
Console.WriteLine($"Consumer {num}: " +
items.Aggregate((a, b) => a + ", " + b));
// real life processing would take some time
await Task.Delay(500);
}
Console.WriteLine($"Consumer {num} complete");
}));
// give consumer tasks time to activate for a better demo
Thread.Sleep(100);
}
// Kick off producer task(s)
List<Task> producers = new List<Task>();
for (int i = 0; i < 3; i++)
{
producers.Add(Task.Factory.StartNew(() =>
{
for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
buffer.Post(j.ToString());
}));
// space out the producers for a better demo
Thread.Sleep(10);
}
// may also use the async equivalent
Task.WaitAll(producers.ToArray());
Console.WriteLine("Finished waiting on producers");
// demo being able to complete the collection
buffer.Complete();
// may also use the async equivalent
Task.WaitAll(consumers.ToArray());
Console.WriteLine("Finished waiting on consumers");
Console.ReadLine();
}
}
这是代码的现代化和简化版本。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
private static async Task Main()
{
var buffer = new BufferBlock<string>();
// Kick off consumer task(s)
var consumers = new List<Task>();
for (var i = 0; i < 3; i++)
{
var id = i;
consumers.Add(Task.Run(() => StartConsumer(id, buffer)));
// give consumer tasks time to activate for a better demo
await Task.Delay(100);
}
// Kick off producer task(s)
var producers = new List<Task>();
for (var i = 0; i < 3; i++)
{
var pid = i;
producers.Add(Task.Run(() => StartProducer(pid, buffer)));
// space out the producers for a better demo
await Task.Delay(10);
}
// may also use the async equivalent
await Task.WhenAll(producers);
Console.WriteLine("Finished waiting on producers");
// demo being able to complete the collection
buffer.Complete();
// may also use the async equivalent
await Task.WhenAll(consumers);
Console.WriteLine("Finished waiting on consumers");
Console.ReadLine();
}
private static async Task StartConsumer(
int id,
IReceivableSourceBlock<string> buffer)
{
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
{
Console.WriteLine($"Consumer {id}: " +
items.Aggregate((a, b) => a + ", " + b));
}
// real life processing would take some time
await Task.Delay(500);
}
Console.WriteLine($"Consumer {id} complete");
}
private static Task StartProducer(int pid, ITargetBlock<string> buffer)
{
for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
{
buffer.Post(j.ToString());
}
return Task.CompletedTask;
}
}