所以我们有这个从队列中读取消息的逻辑:
///<inheritdoc/>
public void ReceiveFromQueue<T>(IMessageHandler<T> callbackHandler, string queuePrefix, string consumer) where T : class, IMessageBase
{
if (callbackHandler == null)
{
_logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(callbackHandler)} is null!");
throw new MessageBusExcepiton($"{nameof(callbackHandler)} is null!", null);
}
if (string.IsNullOrEmpty(queuePrefix))
{
_logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(queuePrefix)} is null!");
throw new MessageBusExcepiton($"{nameof(queuePrefix)} is Empty!", null);
}
if (string.IsNullOrEmpty(queuePrefix))
{
_logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(consumer)} is null!");
throw new MessageBusExcepiton($"{nameof(consumer)} is Empty!", null);
}
try
{
//CREATE CONNECTION
Uri uri = new Uri(_settings.BrokerUri);
_logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create ActiveMQ factory [{_settings.BrokerUri}]");
ConnectionFactory factory = new ConnectionFactory(uri);
IConnection connection = factory.CreateConnection(_settings.Username, _settings.Password);
_logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create session to ActiveMQ Broker [{_settings.BrokerUri}] for user [{_settings.Username}]");
ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
IQueue dest = session.GetQueue(GenerateQueueName(queuePrefix, consumer));
IMessageConsumer messageConsumer = session.CreateConsumer(dest);
_logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to start connection to ActiveMQ Queue [{dest.QueueName}]");
connection.Start();
//Add message handler for consuming messages
messageConsumer.Listener += new MessageListener((receivedMsg) =>
{
_logger.LogDebug($"Message received on [{dest.QueueName}]", receivedMsg);
//try to cast
var msg = receivedMsg as IObjectMessage;
var body = msg?.Body as T;
//In case of unsucessful cast or empty message body will be null
if(body == null)
{
var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = $"Unsuccessful cast to {typeof(T)}" };
_logger.LogDebug($"Message failed on [{dest.QueueName}] for Reason: [\"Unsuccessful cast to {typeof(T)}\"]", receivedMsg);
//add poison message to queue
}
try
{
if (callbackHandler.Handle(body))
{
msg.Acknowledge();
_logger.LogDebug($"Message {body.MessageId} acknowledged on [{dest.QueueName}]");
}
} catch(Exception e)
{
var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = e.Message };
_logger.LogError($"Message handle failed on [{dest.QueueName}] for Reason: [{e.Message}]", e);
//add poison message to queue
}
});
}
catch (NMSSecurityException exc)
{
_logger.LogError($"Error while communicating to ActiveMQ Service!", exc);
}
catch (Exception e)
{
_logger.LogError($"Error on {nameof(ReceiveFromQueue)}", e);
}
}
我们的目标是在队列上有连续的监听器,当队列上有一些消息时会得到通知。我们正在使用故障转移连接,所以如果这足以涵盖网络闪烁和故障以使该侦听器不会停止,我会徘徊吗?如果出现一些错误,如何进行适当的恢复?我们是否需要再次注册监听器或者在这种情况下最佳实践是什么?这将用于多个不同的服务。