8

我将 Celery 与 AMQP 代理一起使用来调用任务,但响应需要使用与 Celery 使用的不同的队列架构传回,因此我只想使用 Kombu 将消息传回。我已经能够做到这一点,但我每次都在创建一个新的连接。Celery 是否使用代理连接池,如果是,您如何访问它?

4

2 回答 2

15

由于 Celery 的文档非常棒,因此进行了大量搜索……但我找到了答案。

Celery 确实使用代理连接池来调用子任务。celery 应用程序有一个pool属性,您可以通过<your_app>.pool或访问该属性celery.current_app.pool。然后,您可以使用pool.acquire().

于 2013-11-14T19:52:55.747 回答
0

此外,可以通过使用 Bootsteps https://docs.celeryproject.org/en/stable/userguide/extending.html

让我从文档中复制粘贴代码(例如,防止将来出现 404 错误)

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('world!')
于 2021-10-16T16:11:41.933 回答