我想使用连续运行的 BufferBlock 来实现消费者/生产者模式,类似于这里的问题和这里的代码。
我尝试使用像 OP 这样的 ActionBlock,但如果缓冲块已满且新消息在其队列中,则新消息永远不会添加到 ConcurrentDictionary _queue。
在下面的代码中,当使用此调用将新消息添加到缓冲区块时,永远不会调用 ConsumeAsync 方法:_messageBufferBlock.SendAsync(message)
如何更正下面的代码,以便每次使用添加新消息时调用 ConsumeAsync 方法_messageBufferBlock.SendAsync(message)
?
public class PriorityMessageQueue
{
private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>();
private volatile BufferBlock<MyMessage> _messageBufferBlock;
private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor
private int _dictionaryKey;
public PriorityMessageQueue()
{
_initializingTask = Init();
}
public async Task<bool> EnqueueAsync(MyMessage message)
{
return await _messageBufferBlock.SendAsync(message);
}
private async Task<bool> ConsumeAsync()
{
try
{
// This code does not fire when a new message is added to the buffereblock
while (await _messageBufferBlock.OutputAvailableAsync())
{
// A message object is never received from the bufferblock
var message = await _messageBufferBlock.ReceiveAsync();
}
return true;
}
catch (Exception ex)
{
return false;
}
}
private async Task<bool> Init()
{
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg =>
{
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageBufferBlock = new BufferBlock<MyMessage>();
_messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50});
return await ConsumeAsync();
}
}
编辑 我已经删除了所有额外的代码并添加了注释。