2

我设置了一个RabbitMQ 服务器,在其中使用Python-Pika获取消息。问题是,如果我启用了持久传递模式,并且工作人员无法处理消息。它不会释放消息,而是会一直保留到消息,直到 RabbitMQ 连接被重置。

有没有办法确保无法处理的消息在合理的时间范围内从可用的工作人员(包括同一个工作人员)再次被拾起?

这是我当前的代码

if success:
    ch.basic_ack(delivery_tag=method.delivery_tag)
else:
    syslog.syslog('Error (Callback) -- Failed to process payload: %s' % body)

这个想法是我永远不想丢失一条消息,而是希望它被重新发布,或者如果它失败了,我希望它再次被拾起。在工作人员成功处理消息之前,应该始终如此。这通常发生在其中一名工作人员无法打开与 HTTP 服务器的连接时。

4

1 回答 1

1

我终于弄清楚为什么会这样了。我没有意识到,当您完成一条消息时,仅仅确认是不够的,而且还必须拒绝任何您无法使用channel.basic_reject. 这似乎很明显,但这不是AMQP的默认行为。

基本上我们必须使用basic_rejectwith requeueset to来释放消息True。这里的重要因素是requeue防止消息被丢弃的关键字,而是将其再次排队,以便我们可用的工作人员之一可以处理它。

if success:
    # On Success - Mark message as processed.
    ch.basic_ack(delivery_tag=method.delivery_tag)
else:
    # Else - Mark message as rejected and move it back to the queue.
    ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)

我在这篇文章中找到了一些非常有用的信息,并且在这篇博文中有更多关于reject关键字的技术细节。

于 2013-04-15T11:58:35.470 回答