10

我有一个现有的 RabbitMQ 部署,其中一些 Java 应用程序使用发送日志消息作为各种通道上的字符串 JSON 对象。我想使用 Celery 来消费这些消息并将它们写入各个地方(例如 DB、Hadoop 等)。

我可以看到 Celery 被设计为既是 RabbitMQ 消息的生产者又是消费者,因为它试图隐藏传递这些消息的机制。有没有办法让 Celery 使用另一个应用程序创建的消息并在它们到达时运行作业?

4

1 回答 1

15

目前很难将自定义消费者添加到 celery 工作者中,但这在开发版本(成为 3.1)中发生了变化,我添加了对消费者引导步骤的支持。

还没有文档,因为我刚刚完成了它,但这里有一个例子:

from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue

class CustomConsumer(ConsumerStep):
   queue = Queue('custom', Exchange('custom'), routing_key='custom')

   def __init__(self, c, enable_custom_consumer=False, **kwargs):
       self.enable = self.enable_custom_consumer

   def get_consumers(self, connection):
       return [
           Consumer(connection.channel(),
               queues=[self.queue],
               callbacks=[self.on_message]),
       ]

   def on_message(self, body, message):
       print('GOT MESSAGE: %r' % (body, ))
       message.ack()


celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
    Option('--enable-custom-consumer', action='store_true',
           help='Enable our custom consumer.'),
)

请注意,最终版本中的 API 可能会发生变化,我还不确定的一件事是在get_consumer(connection). 目前消费者的频道在连接丢失时关闭,在关闭时,但人们可能希望手动处理频道。在这种情况下,总是可以自定义 ConsumerStep,或者编写一个新的 StartStopStep。

于 2012-10-02T12:09:28.570 回答