0

我真正想做的是在当前消费者拒绝消息的情况下将消息留在队列中。在 RabbitMQ 中,我可以发送一个 NACK 来完成此操作。EasyNetQ 是否支持 NACK?有没有另一种方法来实现我正在寻找的行为?

更新:回复不多,所以我想知道人们通常如何处理 EasyNetQ 中缺少 NACK。没有相当于basic.reject将消费者限制为“我总是可以处理每条消息”的场景。我想消费者可以抛出一个特定的“被拒绝”异常来导致 EasyNetQ 将消息出列到错误队列中,并且我可以将带有这些错误的消息重新排队。还有其他人有其他解决方法吗?

4

3 回答 3

4

我使用 EasyNetQ 将近一年,但无论我们如何调整它(其中添加了我们自己的实现IConsumerErrorStrategy),我从来没有真正让它按照我想要的方式工作。RequestAsyncSubscribeAsync处理程序中执行时,它是单线程的这一事实给了我们一些意想不到的行为(有时是死锁) 。

我们的解决方案是从 EasyNetQ 迁移。在使用了官方的 RabbitMq Client 一段时间后,我花了几天时间在此基础上编写了一个超瘦客户端。它受 EasyNetQ 的影响,支持 EasyNetQ 的大部分概念。但是,我添加了一些简洁的功能,例如可插入的消息上下文。我认为IAdvancedMessageContext我刚刚添加的 Nack 功能可能适合您:

var client = service.GetService<IBusClient<AdvancedMessageContext>>();
client.RespondAsync<BasicRequest, BasicResponse>((req, ctx) =>
{
  ctx?.Nack(); // the context implements IAdvancedMessageContext.
  return Task.FromResult<BasicResponse>(null);
}, cfg => cfg.WithNoAck(false));

如果您有兴趣,可以在Github 页面(尤其是NackTests.cs)上阅读更多相关信息。

于 2015-11-14T21:58:55.010 回答
2

我认为您可以通过实现自己的 IConsumerErrorStrategy 来改变行为:

https://github.com/EasyNetQ/EasyNetQ/blob/master/Source/EasyNetQ/Consumer/DefaultConsumerErrorStrategy.cs

但是如果你需要那种控制,你可能会考虑直接使用 RabbitMQ 客户端?

于 2015-10-17T12:46:57.663 回答
1

听起来您正在尝试处理故障。您可以 NACK 一条消息,但这意味着它位于队列的头部。很好,但这意味着您最终可能会收到一堆无法处理的消息,并且您将无法实际处理真正的消息。

我在使用 RabbitMQ 时一直使用的解决方案是利用 EasyNetQ 的默认错误处理,并有一个单独的应用程序来重新发送消息。也就是说,当 RabbitMQ 中捕获到异常时,它会将消息路由到名为“EasyNetQ_Default_Error_Queue”的队列。您可以覆盖此名称并让不同的队列转到不同的错误队列,但现在让我们坚持使用默认值。然后,您可以让 Windows 服务/Azure Worker 角色读取这些消息,并确定要做什么。这可能包括在您的消息信封/包装器上有一个“RetryCount”,以确保它只循环这么多次。总而言之,这将是一些工作。

您所发现的,是许多人在使用 RabbitMQ/EasyNetQ 时遇到的问题。她很原始。

于 2016-02-25T21:42:53.667 回答