我正在使用 Qpid Proton 使用 Python 来使用 AMQ 中队列中的消息。我正在学习这一点——我远不是这方面的专家,所以我正在寻求帮助。我有一种情况,AMQ 队列包含表示要完成的工作的消息,并且它与数据库(这需要时间)发生冲突。所以我正在从队列中接收“N 条”消息,完成工作,然后重复。问题是我想不出一种方法来知道队列是否为空......
有没有办法在最后一条消息被消费后检测代理中的队列是否为空?如果这是一个单线程消费者,那么没问题;但我可能有几个消费者反对一个队列,所以不可能确切地知道队列中有多少消息。
例如,在 on_message 方法上(这来自 Qpid Python 文档(direct_recv.py),我们有:
def on_message(self, event):
if event.message.id and event.message.id < self.received:
# ignore duplicate message
return
if self.expected == 0 or self.received < self.expected:
print(event.message.body)
self.received += 1
if self.received == self.expected:
event.receiver.close()
event.connection.close()
self.acceptor.close()
该片段显示当 self.received == self.expected 时,消费者将关闭。例如,假设 broker 中的一个队列有 65 条消息,调用此方法的消费者设置 self.expected = 25,则在消费 25 条消息后,这将结束;我可以做到这一点。
现在假设代码尝试使用相同的 25 条消息(self.expected = 25),但是此时队列只有 12 条消息,接收方将无限期挂起 - 直到将更多消息添加到队列中。但是在我的情况下,不会添加更多消息。
有没有办法查询队列并确定队列是否为“空”,如果是,退出?换句话说,类似:
if ( self.received == self.expected ) or (empty queue)
event.receiver.close()
event.connection.close()
self.acceptor.close()
我只是不知道如何知道队列是否为空。这甚至可能吗?
作为旁注 ,我可以通过使用 BlockingConnection 来完成这项工作。它不像使用 Receive 类那样优雅,也没有那么快。所以本质上:
- 创建阻塞连接
- 编写一个循环来接收和接受“N个”消息(或在队列为空时等待超时)
- 关闭阻塞连接。
任何帮助将不胜感激,谢谢!