0

我想使用自定义侦听器线程通过从服务器获取的 rabbitmq 队列来消费消息。我的情况是有 100 条消息在队列中,我想在应用程序启动时以快速有效的方式使用这些消息。

我创建了一个可以从 main 方法调用的连接。

def create_connection(host,port,username,password):
    credentials = pika.PlainCredentials(username, password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=host_name, port=port, credentials=credentials))
    channel = connection.channel()  

我创建了消费消息的consume_message 函数,并且我正在使用一个应用程序来消费消息。

def consume_message(QUEUE_NAME):
        try:
           _consume_message(QUEUE_NAME)
        except pika.exceptions.ConnectionClosed:
           log.debug("reconnecting to queue %s",QUEUE_NAME)
           create_connection(host_name + ":" + port, username, password)
           _consume_message(QUEUE_NAME)
        except IOError as e:
           log.error("Error occurred in consuming the message ." + e)
           handle_mq_connection_exception(e)
         
def _consume_message(QUEUE_NAME):
         channel.basic_qos(prefetch_count=1)
         channel.basic_consume(QUEUE_NAME), on_message_callback=callback,
                                  auto_ack=True)
         channel.start_consuming()
    
def callback(ch, method, properties, body):
        response = json.loads(body.decode())
        print(response)
        print(" [x] Done")

我们可以在这里执行 ThreadPoolExecutor 来提交消息吗?如果可以,我们该怎么做?

我们正在使用这个属性channel.basic_qos(prefetch_count=1),如果我们使用 ThreadPoolExecutor 有什么用?

4

0 回答 0