21

我在带有 EasyNetQ 库的 C# 中使用 RabbitMQ。我在这里使用 pub/sub 模式。我仍然有一些问题希望有人能帮助我解决:

  1. 当使用消息时出现错误时,它会自动移动到错误队列中。如何实现重试(以便将其放回原始队列,并且当它无法处理 X 次时,它会移至死信队列)?
  2. 据我所见,总是有 1 个错误队列用于转储来自所有其他队列的消息。如何每种类型有 1 个错误队列,以便每个队列都有自己关联的错误队列?
  3. 如何轻松重试错误队列中的消息?我尝试了软管,但它只是将消息重新发布到错误队列而不是原始队列。我也不太喜欢这个选项,因为我不想在控制台中摆弄。最好我只是针对错误队列进行编程。

任何人?

4

3 回答 3

17

您在使用 EasyNetQ/RabbitMQ 时遇到的问题是,与 SQS 或 Azure 服务总线/队列等其他消息传递服务相比,它更加“原始”,但我会尽力为您指明正确的方向。

问题 1。

这将由你来做。最简单的方法是你可以在 RabbitMQ/EasyNetQ 中 No-Ack 一条消息,它会被放在队列的头部让你重试。这并不是真正可取的,因为它几乎会立即重试(没有时间延迟),并且还会阻止其他消息被处理(如果您有一个预取计数为 1 的单个订阅者)。

我见过使用“MessageEnvelope”的其他实现。所以一个包装类,当消息失败时,您在 MessageEnvelope 上增加一个重试变量并将消息重新传递回队列。您必须这样做并在消息处理程序周围编写包装代码,这不是 EasyNetQ 的功能。

使用上述方法,我还看到人们使用信封,但允许邮件是死信。一旦它进入死信队列,就会有另一个应用程序/工作人员从死信队列中读取项目。

上述所有这些方法都有一个小问题,因为在处理消息时实际上没有任何好的方法可以使对数/指数/任何类型的延迟增加。在将消息返回队列之前,您可以在代码中“保留”消息一段时间,但这不是一个好方法。

在所有这些选项中,您自己的自定义应用程序读取死信队列并根据包含重试计数的信封决定是否重新路由消息可能是最好的方法。

问题2。

您可以使用高级 API 指定每个队列的死信交换。(https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues)。然而,这意味着您将不得不在几乎所有地方使用高级 API,因为使用订阅/发布的简单 IBus 实现查找基于消息类型和订阅者名称命名的队列。使用自定义队列声明意味着您将自己处理队列的命名,这意味着当您订阅时,您将需要知道您想要的名称等。不再为您自动订阅!

问题 3

错误队列/死信队列只是另一个队列。您可以收听此队列并使用它做您需要做的事情。但实际上并没有任何开箱即用的解决方案听起来适合您的需求。

于 2015-06-22T04:32:00.707 回答
9

我已经完全实现了你所描述的。以下是基于我的经验并与您的每个问题相关的一些提示。

Q1(如何重试X次):

为此,您可以使用IMessage.Body.BasicProperties.Headers. 当您使用错误队列中的消息时,只需添加一个带有您选择的名称的标题。在进入错误队列的每条消息上查找此标头并将其递增。这将为您提供正在运行的重试计数。

当一条消息超过 X 的重试限制时,您有一个策略来处理该消息是非常重要的。您不想丢失该消息。就我而言,我当时将消息写入磁盘。它为您提供了许多有用的调试信息以供稍后使用,因为 EasyNetQ 会自动将您的原始消息与错误信息一起包装。它还具有原始消息,因此您可以根据需要手动(或通过一些批处理重新处理代码自动)稍后以某种受控方式重新排列消息。

您可以查看 Hosepipe 实用程序中的代码以了解执行此操作的好方法。事实上,如果您遵循您在那里看到的模式,那么您甚至可以在以后根据需要使用 Hosepipe 将消息重新排队。

Q2(如何为每个原始队列创建一个错误队列):

您可以使用 EasyNetQ Advanced Bus 干净利落地执行此操作。用于IBus.Advanced.Container.Resolve<IConventions>获取约定界面。conventions.ErrorExchangeNamingConvention然后,您可以使用和设置错误队列命名的约定conventions.ErrorQueueNamingConvention。在我的例子中,我将约定设置为基于原始队列的名称,这样每次创建队列时我都会得到一对队列/队列错误。

Q3(如何处理错误队列中的消息):

您可以像执行任何其他队列一样为错误队列声明使用者。同样,AdvancedBus 允许您通过指定从队列中出来的类型是EasyNetQ.SystemMessage.Error. 所以,IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>()会带你到那里。重试只是意味着重新发布到原始交换(注意您放入标头中的重试计数(请参阅我对上面 Q1 的回答),并且您从错误队列中消耗的错误消息中的信息可以帮助您找到目标重新发布。

于 2015-09-29T18:11:07.030 回答
3

我知道这是一篇旧帖子,但是-以防万一它对其他人有所帮助-这是我的自我回答问题(我需要问它,因为现有的帮助还不够),它解释了我如何实现在其原始队列上重试失败的消息。以下应该回答您的问题#1 和#3。对于#2,您可能必须使用我没有使用过的高级 API(我认为它违背了Easy NetQ 的目的;还不如直接使用 RabbitMQ 客户端)。不过,也可以考虑实现 IConsumerErrorStrategy。

1) 由于一条消息可以有多个消费者并且所有人都可能不需要重试一条消息,所以我Dictionary<consumerId, RetryInfo>在消息正文中有一个,因为 EasyNetQ 不(开箱即用)支持消息头中的复杂类型。

public interface IMessageType
{
    int MsgTypeId { get; }

    Dictionary<string, TryInfo> MsgTryInfo {get; set;}

}

2)我已经实现了一个class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer,每次框架调用它时只更新 TryCount 和其他信息。我通过 EasyNetQ 提供的 IoC 支持将这个自定义序列化程序附加到每个消费者的框架中。

 public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

在我的 EasyNetQ 包装类中:

    public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
    {
        if (enableRetry)
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString,
                                                        serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
                                                );
        }
        else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString);
        }
    }

    public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
    {
        IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
        // Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
        // The mediator can transmit the retried msg or choose to ignore it
        return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
    }

3) 将消息添加到默认错误队列后,您可以拥有一个简单的控制台应用程序/Windows 服务,该服务会定期在其原始队列中重新发布现有错误消息。就像是:

var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
    var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
    var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
    pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
    pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
    pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
    var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}

4) 我有一个包含回调函数的 MessageHandler 类。每当消息传递给消费者时,它都会转到 MessageHandler,它决定消息尝试是否有效,如果是,则调用实际的回调。如果尝试无效(maxRetriesExceeded/消费者无论如何都不需要重试),我会忽略该消息。在这种情况下,您可以选择死信消息。

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only 
                                                      // if Retry is valid
}

MsgHandler这是调用回调的中介函数:

    public async Task InvokeMsgCallbackFunc(T msg)
    {
        if (IsTryValid(msg, CurrSubscriptionId))
        {
            await this.MsgCallbackFunc(msg);
        }
        else
        {
            // Do whatever you want
        }
    }
于 2019-03-15T10:22:49.030 回答