0

我想使用连续运行的 BufferBlock 来实现消费者/生产者模式,类似于这里的问题这里的代码。

我尝试使用像 OP 这样的 ActionBlock,但如果缓冲块已满且新消息在其队列中,则新消息永远不会添加到 ConcurrentDictionary _queue。

在下面的代码中,当使用此调用将新消息添加到缓冲区块时,永远不会调用 ConsumeAsync 方法:_messageBufferBlock.SendAsync(message)

如何更正下面的代码,以便每次使用添加新消息时调用 ConsumeAsync 方法_messageBufferBlock.SendAsync(message)

    public class PriorityMessageQueue 
    {
        private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>();
        private volatile BufferBlock<MyMessage> _messageBufferBlock;
        private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor
        private int _dictionaryKey;

        public PriorityMessageQueue()
        {
            _initializingTask = Init();
        }

        public async Task<bool> EnqueueAsync(MyMessage message)
        {
            return await _messageBufferBlock.SendAsync(message);
        }

        private async Task<bool> ConsumeAsync()
        {
            try
            {
                // This code does not fire when a new message is added to the buffereblock
                while (await _messageBufferBlock.OutputAvailableAsync())
                {
                    // A message object is never received from the bufferblock
                    var message = await _messageBufferBlock.ReceiveAsync();


                }

                return true;
            }
            catch (Exception ex)
            {
                return false;
            }
        }

        private async Task<bool> Init()
        {
            var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 50
            };

            var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg =>
            {
                SetMessagePriority(msg);
            }, executionDataflowBlockOptions);

            _messageBufferBlock = new BufferBlock<MyMessage>();
            _messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50});

            return await ConsumeAsync();
        }
    }

编辑 我已经删除了所有额外的代码并添加了注释。

4

1 回答 1

2

我仍然不完全确定您要完成的工作,但我会尽力为您指明正确的方向。示例中的大部分代码并不是绝对必要的。

我需要知道新消息何时到达

如果这是您唯一的要求,那么我假设您只需要在传入新消息时运行一些任意代码。在数据流中执行此操作的最简单方法是使用 aTransformBlock并将该块设置为管道中的初始接收器. 每个块都有自己的缓冲区,因此除非您需要另一个缓冲区,否则可以将其排除在外。

public class PriorityMessageQueue {        
    private TransformBlock<MyMessage, MyMessage> _messageReciever;

    public PriorityMessageQueue() {
        var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            BoundedCapacity = 50
        };

        var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg => {
            SetMessagePriority(msg);
        }, executionDataflowBlockOptions);

        _messageReciever = new TransformBlock<MyMessage, MyMessage>(msg => NewMessageRecieved(msg), executionDataflowBlockOptions);
        _messageReciever.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public async Task<bool> EnqueueAsync(MyMessage message) {
        return await _messageReciever.SendAsync(message);
    }

    private MyMessage NewMessageRecieved(MyMessage message) {
        //do something when a new message arrives

        //pass the message along in the pipeline
        return message;
    }

    private void SetMessagePriority(MyMessage message) {
        //Handle a message
    }
}

EnqueAsync当然,您拥有的另一个选择是在返回任务之前立即执行您需要做的任何事情,SendAsync但这TransformBlock会给您额外的灵活性。

于 2017-09-11T15:44:58.810 回答