1

我在 Celery 中有一个记录器(使用 RabbitMQ),并希望在紧急情况下复制它的工作。

# tasks.py
@task
def log(message):
    with open('test.txt', 'a') as f:
        f.write(message)


# views.py
log.delay(message)

如何在log()调用时使不同机器上的 2 个 Celery 实例运行?

这样做有意义吗?

这在 RabbitMQ 中是可能的。如果你有一个基于主题的交换,很明显可以将一条消息放入两个不同的队列中并独立地传递给 2 个接收者。

sender =>
[message, routing_key=event.logging.log] => [queue A, topic=event.#]     
                                                      => receiver 1
                                         => [queue B, topic=*.logging.*]
                                                      => receiver 2

消息将被发送到两个队列,并且它们都不会从另一个队列中窃取消息。

4

2 回答 2

1

尝试在两台不同的机器上开始这项任务对我来说没有意义。至少 Celery 不能保证一个任务会在不同的机器上运行——它是 RabbitMQ 分配负载,如果一个节点的负载比另一个节点少——运行的两个任务可能会在那台机器上执行......

使用任务。retry反而。如果执行失败,Celery 将重试任务。Celery 足够聪明,可以理解任务是否失败。只需确保在任务失败时引发一些异常,如果无法成功记录则不要静默返回。

更新:

一个可能的工作流程可能是 - 尝试执行任务,如果它失败,在 on_retry 更改路由键,并尝试在不同的交换/队列中执行任务,这可能是您的故障转移队列。

于 2012-07-02T11:21:05.960 回答
1

为此,您必须将交换配置为主题交换(如您所说):

CELERY_QUEUES = {
   'celery': {
       'exchange': 'celerytopic',
       'exchange_type': 'topic',
       'routing_key': 'celery',
   },
}

然后您可以使用 AMQP api 创建备份交换:

 from celery import current_app as celery

 with celery.broker_connection() as conn:
     conn.default_channel.queue_declare(queue='celery.backup', durable=True)
     conn.default_channel.queue_bind(queue='celery.backup',
                                     exchange='celerytopic',
                                     routing_key='celery',
                                     durable=True)

由于您已经有一个名为 celery 的队列,您可能必须先删除它:

$ camqadm queue.delete celery
于 2012-07-02T15:35:41.527 回答