6

在我的测试应用程序中,我可以看到处理后的异常消息被自动插入到默认的 EasyNetQ_Default_Error_Queue 中,这很棒。然后,我可以使用Hosepipe成功转储或重新排队这些消息,这也可以正常工作,但需要下拉到命令行并调用 Hosepipe 和 RabbitMQ API 以清除重试消息的队列。

所以我认为我的应用程序最简单的方法是简单地订阅错误队列,这样我就可以使用相同的基础设施重新处理它们。但在 EastNetQ 中,错误队列似乎很特殊。我们需要使用正确的类型和路由 ID 进行订阅,所以我不确定错误队列的这些值应该是什么:

bus.Subscribe<WhatShouldThisBe>("and-this", ReprocessErrorMessage);

我可以使用简单的 API 订阅错误队列,还是需要深入研究高级 API

如果我的原始消息的类型是TestMessage,那么我希望能够执行以下操作:

bus.Subscribe<ErrorMessage<TestMessage>>("???", ReprocessErrorMessage);

其中ErrorMessage是 EasyNetQ 提供的用于包装所有错误的类。这可能吗?

4

2 回答 2

3

您不能使用简单的 API 来订阅错误队列,因为它不遵循 EasyNetQ 队列类型命名约定——也许这是应该修复的;)

但高级 API 工作正常。您不会得到原始消息,但很容易获得 JSON 表示,您可以很容易地对自己进行反序列化(使用 Newtonsoft.JSON)。这是您的订阅代码应如下所示的示例:

[Test]
[Explicit("Requires a RabbitMQ server on localhost")]
public void Should_be_able_to_subscribe_to_error_messages()
{
    var errorQueueName = new Conventions().ErrorQueueNamingConvention();
    var queue = Queue.DeclareDurable(errorQueueName);
    var autoResetEvent = new AutoResetEvent(false);

    bus.Advanced.Subscribe<SystemMessages.Error>(queue, (message, info) =>
    {
        var error = message.Body;

        Console.Out.WriteLine("error.DateTime = {0}", error.DateTime);
        Console.Out.WriteLine("error.Exception = {0}", error.Exception);
        Console.Out.WriteLine("error.Message = {0}", error.Message);
        Console.Out.WriteLine("error.RoutingKey = {0}", error.RoutingKey);

        autoResetEvent.Set();
        return Task.Factory.StartNew(() => { });
    });

    autoResetEvent.WaitOne(1000);
}

在此之前,我必须修复在 EasyNetQ 中编写代码的错误消息中的一个小错误,因此请在尝试之前获取一个 >= 0.9.2.73 的版本。您可以在此处查看代码示例

于 2013-02-25T10:15:45.660 回答
0

有效的代码:(我猜了一下)

'foo' 的诡异之处在于,如果我只是将该函数 HandleErrorMessage2 传递给 Consume 调用,它无法确定它返回的是 void 而不是 Task,因此无法确定要使用哪个重载。(VS 2012)分配给一个 var 让它快乐。您将需要捕获调用的返回值,以便能够通过处置对象来取消订阅。

另请注意,有人使用了系统对象名称(队列)而不是使其成为 EasyNetQueue 或其他名称,因此您必须为编译器添加使用说明,或完全指定它。

 using Queue = EasyNetQ.Topology.Queue;

  private const string QueueName = "EasyNetQ_Default_Error_Queue";
  public static void Should_be_able_to_subscribe_to_error_messages(IBus bus)
  {
     Action <IMessage<Error>, MessageReceivedInfo> foo = HandleErrorMessage2;

     IQueue queue = new Queue(QueueName,false);
     bus.Advanced.Consume<Error>(queue, foo);
  }

  private static void HandleErrorMessage2(IMessage<Error> msg, MessageReceivedInfo info)
 {
 }
于 2014-07-17T19:56:32.337 回答