0

背景

我们正在使用langohr与 RabbitMQ 进行交互。我们尝试了两种不同的方法来让 RabbitMQ 重新发送我们的服务尚未正确处理的消息。一种可行的方法是将basic.nack带有requeueset 的 a 发送到,true但这将立即重新发送消息,直到服务以 a 响应basic.ack。例如,如果服务尝试将消息持久保存到当前已关闭(并且已关闭一段时间)的数据存储区,这会有点问题。对我们来说,最好每 20 秒左右获取一次未传递的消息(即,我们既不做,basic.ack或者basic.nack如果数据存储已关闭,我们只是让消息保留在队列中)。我们尝试使用ExecutorService其要点来实现这一点,如下所示:

(let [chan (lch/open conn)]  ; We create a new channel since channels in Langohr are not thread-safe
    (log/info "Triggering \"recover\" for channel" chan)
    (try
      (lb/recover chan)
      (catch Exception e (log/error "Failed to call recover" e))
      (finally (lch/close chan))))

不幸的是,这似乎不起作用(消息没有重新传递,只是保留在队列中)。如果我们重新启动服务,排队的消息将被正确使用。但是,我们还有其他使用spring-rabbitmq(Java 中)实现的服务,他们似乎开箱即用地处理了这个问题。我已经尝试查看源代码以弄清楚他们是如何做到的,但我还没有设法这样做。

问题

你如何指示 RabbitMQ 定期(重新)传递队列中的消息(最好使用 Langohr)?

4

2 回答 2

2

我不确定您在使用 Spring AMQP 应用程序做什么,但是 RabbitMQ 中没有为此内置任何内容。

但是,使用 TTL 设置死信非常容易,以便在一段时间后重新排队回到原始队列。有关示例、链接等,请参见此答案。

编辑

但是,Spring AMQP确实有一个重试拦截器,可以将其配置为在重试期间暂停消费者线程一段时间

有状态的重试拒绝和重新排队;stateless retry 在内部处理重试,在重试期间不与代理交互。

于 2016-01-03T16:53:49.230 回答
1

请参阅此答案,其中包含说明:我们对消息进行 Nack,nack 将消息放入保持队列 N 秒,然后它 TTL 从该队列中退出并进入另一个队列,将其放回原始队列中。

设置需要一些工作,但效果很好!

于 2016-01-03T19:08:00.190 回答