我知道这是一篇旧帖子,但是-以防万一它对其他人有所帮助-这是我的自我回答问题(我需要问它,因为现有的帮助还不够),它解释了我如何实现在其原始队列上重试失败的消息。以下应该回答您的问题#1 和#3。对于#2,您可能必须使用我没有使用过的高级 API(我认为它违背了Easy NetQ 的目的;还不如直接使用 RabbitMQ 客户端)。不过,也可以考虑实现 IConsumerErrorStrategy。
1) 由于一条消息可以有多个消费者并且所有人都可能不需要重试一条消息,所以我Dictionary<consumerId, RetryInfo>
在消息正文中有一个,因为 EasyNetQ 不(开箱即用)支持消息头中的复杂类型。
public interface IMessageType
{
int MsgTypeId { get; }
Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
2)我已经实现了一个class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer
,每次框架调用它时只更新 TryCount 和其他信息。我通过 EasyNetQ 提供的 IoC 支持将这个自定义序列化程序附加到每个消费者的框架中。
public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
{
public string Serialize(byte[] messageBody)
{
string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);
// Add/update RetryInformation into objectifiedMsgBody here
// I have a dictionary that saves <key:consumerId, val: TryInfoObj>
return JsonConvert.SerializeObject(objectifiedMsgBody);
}
}
在我的 EasyNetQ 包装类中:
public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
{
if (enableRetry)
{
_defaultBus = RabbitHutch.CreateBus(currentConnString,
serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
);
}
else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
{
_defaultBus = RabbitHutch.CreateBus(currentConnString);
}
}
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}
3) 将消息添加到默认错误队列后,您可以拥有一个简单的控制台应用程序/Windows 服务,该服务会定期在其原始队列中重新发布现有错误消息。就像是:
var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}
4) 我有一个包含回调函数的 MessageHandler 类。每当消息传递给消费者时,它都会转到 MessageHandler,它决定消息尝试是否有效,如果是,则调用实际的回调。如果尝试无效(maxRetriesExceeded/消费者无论如何都不需要重试),我会忽略该消息。在这种情况下,您可以选择死信消息。
public interface IMsgHandler<T> where T: class, IMessageType
{
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc { get; set; }
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
}
MsgHandler
这是调用回调的中介函数:
public async Task InvokeMsgCallbackFunc(T msg)
{
if (IsTryValid(msg, CurrSubscriptionId))
{
await this.MsgCallbackFunc(msg);
}
else
{
// Do whatever you want
}
}