2

我正在尝试用芹菜构建一个分布式作业执行系统。

当我在一台机器(本地主机)上启动 2 个工作人员时,其中一个用于加法任务add,另一个用于减法任务sub,然后用于add.delay()启动多个加法任务,减法工作人员的终端出现错误:

[2013-03-05 15:51:18,898: ERROR/MainProcess] Received unregistered task of type 'add_tasks.add'.

在这个测试中,我启动了 2 个加法任务:一个被加法工作者捕获,另一个被减法工作者捕获,这导致了上面的错误。如何更改配置以使减法工作人员不会捕获第二个加法任务?谢谢。

这是代码:

add_tasks.py:

celery = Celery('add_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def add(x, y):
    sleep(20)
    return x + y

sub_tasks.py:

celery = Celery('sub_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def sub(x, y):
    sleep(10)
    return x - y

我通过 localhost 机器的两个终端启动了工作celery -A add_tasks worker --loglevel=info -n worker1人员celery -A sub_tasks worker --loglevel=info -n worker2

4

1 回答 1

4

最后我发现这个ROUTER功能可以解决我的问题。我把我的解决方案放在这里,希望它对其他有同样问题的人有用。

启动工作人员时,我们可以使用-Q queue选项来限制工作人员只接受queue. 在我的情况下,我使用了celery -A add_tasks worker --loglevel=info -n worker1 -Q addition.

另一方面,当开始一个新任务时,我们应该用队列参数明确指出,例如add.apply_async(queue='addition',priority=0,args=[1,4])and sub.apply_async(queue='subtraction',priority=0,args=[1,4])。那么加法任务将不会被减法工作者接受。

于 2013-03-05T09:52:14.517 回答