我想使用自定义侦听器线程通过从服务器获取的 rabbitmq 队列来消费消息。我的情况是有 100 条消息在队列中,我想在应用程序启动时以快速有效的方式使用这些消息。
我创建了一个可以从 main 方法调用的连接。
def create_connection(host,port,username,password):
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host_name, port=port, credentials=credentials))
channel = connection.channel()
我创建了消费消息的consume_message 函数,并且我正在使用一个应用程序来消费消息。
def consume_message(QUEUE_NAME):
try:
_consume_message(QUEUE_NAME)
except pika.exceptions.ConnectionClosed:
log.debug("reconnecting to queue %s",QUEUE_NAME)
create_connection(host_name + ":" + port, username, password)
_consume_message(QUEUE_NAME)
except IOError as e:
log.error("Error occurred in consuming the message ." + e)
handle_mq_connection_exception(e)
def _consume_message(QUEUE_NAME):
channel.basic_qos(prefetch_count=1)
channel.basic_consume(QUEUE_NAME), on_message_callback=callback,
auto_ack=True)
channel.start_consuming()
def callback(ch, method, properties, body):
response = json.loads(body.decode())
print(response)
print(" [x] Done")
我们可以在这里执行 ThreadPoolExecutor 来提交消息吗?如果可以,我们该怎么做?
我们正在使用这个属性channel.basic_qos(prefetch_count=1)
,如果我们使用 ThreadPoolExecutor 有什么用?