0

我有一个像这样的 BufferBlock 设置。

_inputQueue = new BufferBlock<WorkItem>(new DataflowBlockOptions
{
    BoundedCapacity = 1,
    CancellationToken = cancellationToken,
    EnsureOrdered = true
});

让多个消费者从不同的线程调用“FetchWork”函数

public async Task<WorkItem> GetWork()
{
    WorkItem wi;
    try
    {
        wi = await _inputQueue.ReceiveAsync(new TimeSpan(0, 0, 1));
    }
    catch (Exception)
    {
        //since we supplied a timeout, this will be thrown if no items come back
        return null;
    }
    return wi;
}

有时,同一个工作项最终会出现在多个消费者中!InputQueue 中的工作项数量越多,在 GetWork 中收到重复项的机会就越大。我的理解是通过 ReceiveAsync 获取的项目是原子的,一旦读取一个项目,就不会再次读取。这不会发生在这里。我有40 个并行消费者调用 GetWork。

4

1 回答 1

1

这似乎是一个服务结构问题。BufferBlock 仅将项目出队一次。生产者 [分区计数为 5 的服务结构有状态服务实例] 在不同分区中两次接收相同的项目。必须对此进行调查。

于 2019-03-16T22:32:14.183 回答