6

我想使用使用 java nio 编写的彗星服务器来发送实时更新。接收信息时,我希望它扫描数据,并通过rabbitmq 将任务发送到工作线程。理想情况下,我希望 celery 服务器位于 rabbit 的另一端,管理一个处理这些任务的工作线程池。

但是,据我了解,celery 是在rabbitmq 的两端工作的,它本质上是通过嵌入消费者和生产者的代码中来扮演生产者和消费者的角色。有没有办法像我上面描述的那样设置芹菜?谢谢

4

2 回答 2

8

不必使用 Celery 发布消息。您可以从您自己的应用程序向 RabbitMQ 或其他代理发布消息,并使用 Celery 来消费任务。

Celery 使用简单的消息协议。您可以在应用程序中实现客户端。

如果您不想实现协议的客户端,您可以实现一个简单的 http 服务器,它接受请求并进行适当的调用。像这样

于 2012-08-15T09:05:01.303 回答
8

是的,当然!

您可以添加Custom Message Consumers到 celery 应用程序。

请参考 celery 文档中的Extensions and Bootsteps

这是上面链接中的示例代码的一部分:

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

测试它:

python -m celery -A 主要工作者

另请参阅:将 Celery 与现有的 RabbitMQ 消息一起使用

于 2016-01-23T13:06:33.407 回答