4

我正在使用 txamqp python 库连接到 AMQP 代理(RabbitMQ),并且我有一个带有以下回调的消费者:

@defer.inlineCallbacks
def message_callback(self, message, queue, chan):
    """This callback is a queue listener
    it is called whenever a message was consumed from queue
    c.f. test_amqp.ConsumeTestCase for use cases
    """

    # The callback should be redefined here to keep getting further messages from queue
    queue.get().addCallback(self.message_callback, queue, chan).addErrback(self.message_errback)        

    print " [x] Received a valid message: [%r]" % (message.content.body,)

    yield self.smpp.sendDataRequest(SubmitSmPDU)

    # ACK the message in queue, this will remove it from the queue
    chan.basic_ack(message.delivery_tag)

当“确认”一条消息时,它将从队列中删除(确认?),但是当消息没有“确认”时会发生什么?我需要一个“重试”机制,我可以推迟消息稍后再次回调,并跟踪它重试了多少次。

以及如何从队列中列出/删除消息?

4

2 回答 2

1

RabbitMQ 有一个不错的管理插件,但是它甚至不允许从队列中删除消息。

您基本上必须编写自己的应用程序,或者找出这些第 3 方管理应用程序中的哪些可以删除消息。

于 2012-09-21T22:24:36.163 回答
1

已解决,为了重试队列中的消息,必须拒绝带有“重试”标志的消息,它将重新排入队列。

如果我用计时器拒绝它(callLater in twisted),则排队的消息将被推迟到我想要的任何时间。

于 2012-12-12T15:08:18.940 回答