1

我想在 pika basic_consume 中运行消息回调异步。这可能吗?怎么做?我们已经为其他任务运行了一个异步循环,这个消费者使用带有异步连接的 httpx 来调用内部服务。

这是我们当前的 Consumer 类:

class Consumer:
    """
    https://www.devmashup.com/creating-a-rabbitmq-consumer-in-python/
    """

    connection: AsyncioConnection
    channel: Any
    routing_key: str

    def __init__(self, routing_key) -> None:
        self.connection = self.__create_connection()
        self.channel = self.connection.channel()
        self.__create_exchange()
        self.routing_key = routing_key

    @staticmethod
    def __create_connection():
        credentials = PlainCredentials(
            settings.mqtt_vhost_user, settings.mqtt_vhost_password
        )

        parameters = ConnectionParameters(
            settings.mqtt_host, settings.mqtt_port, settings.mqtt_vhost, credentials
        )

        return AsyncioConnection(parameters)

    def close_connection(self):
        self.connection.close()

    def __create_exchange(self):
        self.channel.exchange_declare(
            exchange=settings.mqtt_exchange,
            exchange_type=settings.mqtt_exchange_type,
            passive=False,
            durable=True,
            auto_delete=False,
            internal=False,
        )

    def consume(self, message_received_callback):
        logger.info(f"Started consumer for {self.routing_key}")

        self.channel.queue_declare(
            queue=self.routing_key,
            passive=False,
            durable=True,
            exclusive=False,
            auto_delete=False,
        )

        self.channel.queue_bind(
            queue=self.routing_key,
            exchange=settings.mqtt_exchange,
            routing_key=self.routing_key,
        )

        async def consume_message(channel, method, properties, body):
            await message_received_callback(body)
            channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(
            self.routing_key,
            consume_message,
        )

        self.channel.start_consuming()
4

0 回答 0