背景
我们正在使用langohr与 RabbitMQ 进行交互。我们尝试了两种不同的方法来让 RabbitMQ 重新发送我们的服务尚未正确处理的消息。一种可行的方法是将basic.nack
带有requeue
set 的 a 发送到,true
但这将立即重新发送消息,直到服务以 a 响应basic.ack
。例如,如果服务尝试将消息持久保存到当前已关闭(并且已关闭一段时间)的数据存储区,这会有点问题。对我们来说,最好每 20 秒左右获取一次未传递的消息(即,我们既不做,basic.ack
或者basic.nack
如果数据存储已关闭,我们只是让消息保留在队列中)。我们尝试使用ExecutorService
其要点来实现这一点,如下所示:
(let [chan (lch/open conn)] ; We create a new channel since channels in Langohr are not thread-safe
(log/info "Triggering \"recover\" for channel" chan)
(try
(lb/recover chan)
(catch Exception e (log/error "Failed to call recover" e))
(finally (lch/close chan))))
不幸的是,这似乎不起作用(消息没有重新传递,只是保留在队列中)。如果我们重新启动服务,排队的消息将被正确使用。但是,我们还有其他使用spring-rabbitmq(Java 中)实现的服务,他们似乎开箱即用地处理了这个问题。我已经尝试查看源代码以弄清楚他们是如何做到的,但我还没有设法这样做。
问题
你如何指示 RabbitMQ 定期(重新)传递队列中的消息(最好使用 Langohr)?