我正在尝试channel.consume
使用 Pika 0.11.2 从 RabbitMQ 服务器获取消息时进行验证
通话时get_mq
,我检测到:队列中的消息为空时发生异常。那么,我怎样才能验证异常
这是我的代码:
def send_mq(self, message, queue_name):
try:
if message and queue_name:
message = json.dumps(message)
self.connect(queue_name)
self.channel.basic_publish(exchange="", routing_key=queue_name, body=message)
if self.mqConn:
self.mqConn.close()
except Exception as ex:
sentry_log('send_mq_error', str(ex))
def get_mq(self, callback, queue_name, limit_message=0):
if callback and queue_name:
self.connect(queue_name)
queue = self.channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)
# Enabled delivery confirmations
self.channel.confirm_delivery()
if queue and queue.method.message_count > 0:
# method_frame, header_frame, body = self.channel.basic_get(queue=queue_name)
# self.channel.basic_ack(delivery_tag=method_frame.delivery_tag)
# self.channel.basic_qos(prefetch_count=1)
# self.channel.basic_consume(lambda ch, method, properties, body: callback(json.loads(body) if body else body), queue=queue_name, no_ack=True)
# # # start consuming (blocks)
# try:
# self.channel.start_consuming()
# except KeyboardInterrupt:
# self.channel.stop_consuming()
try:
# Get ten messages and break out
for method_frame, properties, body in self.channel.consume(queue=queue_name, inactivity_timeout=1):
if body:
# Acknowledge the message
self.channel.basic_ack(method_frame.delivery_tag)
self.channel.basic_qos(prefetch_count=1)
# Escape out of the loop with limit messages
if limit_message > 0 and method_frame.delivery_tag == limit_message:
break
callback(json.loads(body))
else:
break
self.channel.cancel()
except Exception as ex:
self.channel.cancel()
if self.mqConn:
self.mqConn.close()
错误信息:
“Nonetype”对象不可迭代