我想在 pika basic_consume 中运行消息回调异步。这可能吗?怎么做?我们已经为其他任务运行了一个异步循环,这个消费者使用带有异步连接的 httpx 来调用内部服务。
这是我们当前的 Consumer 类:
class Consumer:
"""
https://www.devmashup.com/creating-a-rabbitmq-consumer-in-python/
"""
connection: AsyncioConnection
channel: Any
routing_key: str
def __init__(self, routing_key) -> None:
self.connection = self.__create_connection()
self.channel = self.connection.channel()
self.__create_exchange()
self.routing_key = routing_key
@staticmethod
def __create_connection():
credentials = PlainCredentials(
settings.mqtt_vhost_user, settings.mqtt_vhost_password
)
parameters = ConnectionParameters(
settings.mqtt_host, settings.mqtt_port, settings.mqtt_vhost, credentials
)
return AsyncioConnection(parameters)
def close_connection(self):
self.connection.close()
def __create_exchange(self):
self.channel.exchange_declare(
exchange=settings.mqtt_exchange,
exchange_type=settings.mqtt_exchange_type,
passive=False,
durable=True,
auto_delete=False,
internal=False,
)
def consume(self, message_received_callback):
logger.info(f"Started consumer for {self.routing_key}")
self.channel.queue_declare(
queue=self.routing_key,
passive=False,
durable=True,
exclusive=False,
auto_delete=False,
)
self.channel.queue_bind(
queue=self.routing_key,
exchange=settings.mqtt_exchange,
routing_key=self.routing_key,
)
async def consume_message(channel, method, properties, body):
await message_received_callback(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
self.routing_key,
consume_message,
)
self.channel.start_consuming()