我同意其他人的观点,即 TPL Dataflow 听起来是一个很好的解决方案。
为了限制处理,您可以创建一个TransformBlock
实际上不会以任何方式转换数据的方法,如果它在之前的数据之后过早到达,它只会延迟它:
static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
DateTime lastItem = DateTime.MinValue;
return new TransformBlock<T, T>(
async x =>
{
var waitTime = lastItem + delay - DateTime.UtcNow;
if (waitTime > TimeSpan.Zero)
await Task.Delay(waitTime);
lastItem = DateTime.UtcNow;
return x;
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
然后创建一个生成数据的方法(例如从 0 开始的整数):
static async Task Producer(ITargetBlock<int> target)
{
int i = 0;
while (await target.SendAsync(i))
i++;
}
它是异步编写的,因此如果目标块现在无法处理项目,它将等待。
然后写一个消费者方法:
static void Consumer(int i)
{
Console.WriteLine(i);
}
最后,将它们链接在一起并启动它:
var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));
var consumerBlock = new ActionBlock<int>(
(Action<int>)Consumer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });
Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);
在这里,delayBlock
每 500 毫秒最多接受一项,并且该Consumer()
方法可以并行运行多次。要完成处理,请调用delayBlock.Complete()
。
如果您想为每个 #2 添加一些缓存,您可以创建另一个TransformBlock
在那里完成工作并将其链接到其他块。