0

我正在寻找任何帮助。我想修复一个insert_order_queue()功能,以便能够将消息重新发送到RabbitMQ如果消息实际上没有传递到服务器,

这是我当前的代码:

def insert_order_queue(self, msg):
    ''' Insert message into the queue '''
    if msg:
        msg_props = pika.BasicProperties(delivery_mode=conf.rabbit_msg_props_delivery_mode,
                                         content_type=conf.rabbit_msg_props_content_type)
        logger.info('Message : %s' % msg) 
        try:
            self.channel.basic_publish(body=json.dumps(msg),
                                       exchange=conf.rabbit_exchange_name,
                                       properties=msg_props,
                                       routing_key=conf.rabbit_exchange_routing_key) 
        except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
            logger.error('AMQP Connection failed. Trying again... %s' % error)
            self._connect()
            return
    else:
        logger.error('Something wrong') 

这是我的_connect()方法:

def _connect(self):
    ''' Connecting to the RabbitMQ, and declare queue '''
    logger.info('Trying to connect to RabbitMQ')
    while True:
        try:
            conn_broker = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host=conf.rabbit_server,
                    port=conf.rabbit_port,
                    virtual_host=conf.rabbit_vhost,
                    ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
                    heartbeat_interval=conf.rabbit_heartbeat_interval,
                    credentials=pika.PlainCredentials(
                        conf.rabbit_user,
                        conf.rabbit_pass)))
            logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port)) 
            channel = conn_broker.channel()
            # Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
            channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
            status = channel.queue_declare(queue=conf.rabbit_queue_name,
                                           durable=conf.rabbit_queue_durable,
                                           exclusive=conf.rabbit_queue_exclusive,
                                           passive=conf.rabbit_queue_passive)
            if status.method.message_count == 0:
                logger.info("Queue empty")
            else:
                logger.info('Queue status: %s' % status)                  
            channel.queue_bind(
                queue=conf.rabbit_queue_name,
                exchange=conf.rabbit_exchange_name,
                routing_key=conf.rabbit_exchange_routing_key)  
            return channel
        except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
            time.sleep(3)
            logger.error('Exception while connecting to Rabbit %s' % error)
        else:
            break 
4

1 回答 1

1

有几种方式无法“传递”消息

最明显的是“与rabbit的连接已关闭”,在这种情况下您只需重新连接并重新发送(就重新连接而言,您已经拥有大部分逻辑,只需要重新发送消息)。

然后,“没有人在收听此消息”有几种变体。这些可以通过 basic_publish 上的立即和强制标志来处理。有关更多信息,请参见:http: //bunnyamqp.wordpress.com/2009/08/21/amqp-basic-publish-immediate-versus-mandatory/

最后,您可以添加确认回调。Pika 允许您设置此回调:

https://github.com/pika/pika/blob/master/pika/channel.py#L387

在该回调中,您可以决定是否再次发送消息。

于 2013-07-25T18:04:10.460 回答