我将 Celery 与 AMQP 代理一起使用来调用任务,但响应需要使用与 Celery 使用的不同的队列架构传回,因此我只想使用 Kombu 将消息传回。我已经能够做到这一点,但我每次都在创建一个新的连接。Celery 是否使用代理连接池,如果是,您如何访问它?
问问题
2503 次
2 回答
0
此外,可以通过使用 Bootsteps https://docs.celeryproject.org/en/stable/userguide/extending.html
让我从文档中复制粘贴代码(例如,防止将来出现 404 错误)
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://')
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=['json'])]
def handle_message(self, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
def send_me_a_message(who, producer=None):
with app.producer_or_acquire(producer) as producer:
producer.publish(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)
if __name__ == '__main__':
send_me_a_message('world!')
于 2021-10-16T16:11:41.933 回答