我有一个像这样的 BufferBlock 设置。
_inputQueue = new BufferBlock<WorkItem>(new DataflowBlockOptions
{
BoundedCapacity = 1,
CancellationToken = cancellationToken,
EnsureOrdered = true
});
让多个消费者从不同的线程调用“FetchWork”函数
public async Task<WorkItem> GetWork()
{
WorkItem wi;
try
{
wi = await _inputQueue.ReceiveAsync(new TimeSpan(0, 0, 1));
}
catch (Exception)
{
//since we supplied a timeout, this will be thrown if no items come back
return null;
}
return wi;
}
有时,同一个工作项最终会出现在多个消费者中!InputQueue 中的工作项数量越多,在 GetWork 中收到重复项的机会就越大。我的理解是通过 ReceiveAsync 获取的项目是原子的,一旦读取一个项目,就不会再次读取。这不会发生在这里。我有40 个并行消费者调用 GetWork。