我有一个现有的 RabbitMQ 部署,其中一些 Java 应用程序使用发送日志消息作为各种通道上的字符串 JSON 对象。我想使用 Celery 来消费这些消息并将它们写入各个地方(例如 DB、Hadoop 等)。
我可以看到 Celery 被设计为既是 RabbitMQ 消息的生产者又是消费者,因为它试图隐藏传递这些消息的机制。有没有办法让 Celery 使用另一个应用程序创建的消息并在它们到达时运行作业?
目前很难将自定义消费者添加到 celery 工作者中,但这在开发版本(成为 3.1)中发生了变化,我添加了对消费者引导步骤的支持。
还没有文档,因为我刚刚完成了它,但这里有一个例子:
from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue
class CustomConsumer(ConsumerStep):
queue = Queue('custom', Exchange('custom'), routing_key='custom')
def __init__(self, c, enable_custom_consumer=False, **kwargs):
self.enable = self.enable_custom_consumer
def get_consumers(self, connection):
return [
Consumer(connection.channel(),
queues=[self.queue],
callbacks=[self.on_message]),
]
def on_message(self, body, message):
print('GOT MESSAGE: %r' % (body, ))
message.ack()
celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
Option('--enable-custom-consumer', action='store_true',
help='Enable our custom consumer.'),
)
请注意,最终版本中的 API 可能会发生变化,我还不确定的一件事是在get_consumer(connection)
. 目前消费者的频道在连接丢失时关闭,在关闭时,但人们可能希望手动处理频道。在这种情况下,总是可以自定义 ConsumerStep,或者编写一个新的 StartStopStep。