0

所以我们有这个从队列中读取消息的逻辑:

///<inheritdoc/>
        public void ReceiveFromQueue<T>(IMessageHandler<T> callbackHandler, string queuePrefix, string consumer) where T : class, IMessageBase
        {
            if (callbackHandler == null)
            {
                _logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(callbackHandler)} is null!");
                throw new MessageBusExcepiton($"{nameof(callbackHandler)} is null!", null);
            }

            if (string.IsNullOrEmpty(queuePrefix))
            {
                _logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(queuePrefix)} is null!");
                throw new MessageBusExcepiton($"{nameof(queuePrefix)} is Empty!", null);
            }

            if (string.IsNullOrEmpty(queuePrefix))
            {
                _logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(consumer)} is null!");
                throw new MessageBusExcepiton($"{nameof(consumer)} is Empty!", null);
            }

            try
            {
                //CREATE CONNECTION
                Uri uri = new Uri(_settings.BrokerUri);
                _logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create ActiveMQ factory [{_settings.BrokerUri}]");
                ConnectionFactory factory = new ConnectionFactory(uri);

                IConnection connection = factory.CreateConnection(_settings.Username, _settings.Password);

                _logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create session to ActiveMQ Broker [{_settings.BrokerUri}] for user [{_settings.Username}]");
                ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);

                IQueue dest = session.GetQueue(GenerateQueueName(queuePrefix, consumer));
                IMessageConsumer messageConsumer = session.CreateConsumer(dest);

                _logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to start connection to ActiveMQ Queue [{dest.QueueName}]");
                connection.Start();

                //Add message handler for consuming messages
                messageConsumer.Listener += new MessageListener((receivedMsg) =>
                {
                    _logger.LogDebug($"Message received on [{dest.QueueName}]", receivedMsg);

                    //try to cast
                    var msg = receivedMsg as IObjectMessage;
                    var body = msg?.Body as T;

                    //In case of unsucessful cast or empty message body will be null
                    if(body == null)
                    {
                        var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = $"Unsuccessful cast to {typeof(T)}" };
                        _logger.LogDebug($"Message failed on [{dest.QueueName}] for Reason: [\"Unsuccessful cast to {typeof(T)}\"]", receivedMsg);

                        //add poison message to queue
                    }

                    try
                    {
                        if (callbackHandler.Handle(body))
                        {
                            msg.Acknowledge();
                            _logger.LogDebug($"Message {body.MessageId} acknowledged on [{dest.QueueName}]");
                        }

                    } catch(Exception e)
                    {
                        var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = e.Message };
                        _logger.LogError($"Message handle failed on [{dest.QueueName}] for Reason: [{e.Message}]", e);

                        //add poison message to queue
                    }
                });
            }
            catch (NMSSecurityException exc)
            {
                _logger.LogError($"Error while communicating to ActiveMQ Service!", exc);
            }
            catch (Exception e)
            {
                _logger.LogError($"Error on {nameof(ReceiveFromQueue)}", e);
            }
        }

我们的目标是在队列上有连续的监听器,当队列上有一些消息时会得到通知。我们正在使用故障转移连接,所以如果这足以涵盖网络闪烁和故障以使该侦听器不会停止,我会徘徊吗?如果出现一些错误,如何进行适当的恢复?我们是否需要再次注册监听器或者在这种情况下最佳实践是什么?这将用于多个不同的服务。

4

0 回答 0