我正在寻找任何帮助。我想修复一个insert_order_queue()
功能,以便能够将消息重新发送到RabbitMQ
如果消息实际上没有传递到服务器,
这是我当前的代码:
def insert_order_queue(self, msg):
''' Insert message into the queue '''
if msg:
msg_props = pika.BasicProperties(delivery_mode=conf.rabbit_msg_props_delivery_mode,
content_type=conf.rabbit_msg_props_content_type)
logger.info('Message : %s' % msg)
try:
self.channel.basic_publish(body=json.dumps(msg),
exchange=conf.rabbit_exchange_name,
properties=msg_props,
routing_key=conf.rabbit_exchange_routing_key)
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
logger.error('AMQP Connection failed. Trying again... %s' % error)
self._connect()
return
else:
logger.error('Something wrong')
这是我的_connect()
方法:
def _connect(self):
''' Connecting to the RabbitMQ, and declare queue '''
logger.info('Trying to connect to RabbitMQ')
while True:
try:
conn_broker = pika.BlockingConnection(
pika.ConnectionParameters(
host=conf.rabbit_server,
port=conf.rabbit_port,
virtual_host=conf.rabbit_vhost,
ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
heartbeat_interval=conf.rabbit_heartbeat_interval,
credentials=pika.PlainCredentials(
conf.rabbit_user,
conf.rabbit_pass)))
logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port))
channel = conn_broker.channel()
# Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
status = channel.queue_declare(queue=conf.rabbit_queue_name,
durable=conf.rabbit_queue_durable,
exclusive=conf.rabbit_queue_exclusive,
passive=conf.rabbit_queue_passive)
if status.method.message_count == 0:
logger.info("Queue empty")
else:
logger.info('Queue status: %s' % status)
channel.queue_bind(
queue=conf.rabbit_queue_name,
exchange=conf.rabbit_exchange_name,
routing_key=conf.rabbit_exchange_routing_key)
return channel
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
time.sleep(3)
logger.error('Exception while connecting to Rabbit %s' % error)
else:
break