4

我创建了一个 celery 任务脚本,如下所示:

from celery import Task
from celery.contrib.methods import task
from celery.contrib.methods import task_method
from pipelines.addsub import settings
from pipelines.addsub.log import register_task_log


@register_task_log(__name__)
class AddTask(Task):

    @task(filter=task_method, name='AddTask.get')
    def get(self, x, y):
        self.log.info("Calling task add(%d, %d)" % (x, y))
        return x + y

我确实定义了以下队列和路线:

CELERY_QUEUES = {
    'celery': {
        'exchange': 'celery',
        'binding_key': 'celery',
    },
    'addsub': {
        'exchange': 'addsub',
        'binding_key': 'addsub.operations',
    },
}

CELERY_ROUTES = {
    'AddTask.get': {
        'queue': 'addsub',
        'routing_key': 'addsub.operations',
    },
}

我按如下方式启动芹菜工人:

celery -c 2 -A pipelines.celery.celery worker -Q addsub -E -l DEBUG --logfile=~/celery_workflows/addsubtasks/addsub.log

我可以从celery shell成功运行AddTask.get(1,3)

然后我使用node-celery模块运行以下 node.js 脚本:

"use strict";
var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://[user]:[password]@[hostname]:5672//prote.broker',
        CELERY_RESULT_BACKEND: 'amqp',
        CELERY_ROUTES: {'AddTask.get': {queue: 'addsub'}}
    }),
    get_addition = client.createTask('AddTask.get');

client.on('error', function (err) {
    console.log(err);
});

client.on('connect', function () {
    console.log('Connected ...');
    get_addition.call([], {
        x: 1,
        y: 3
    }); // sends a task to the addsub queue
});

该脚本返回以下错误:

2014-09-13 14:18:59,422: INFO/MainProcess] Received task: AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d]
[2014-09-13 14:18:59,422: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7fc407d5fde8> (args:(u'AddTask.get', u'261fb059-b88e-444b-b218-c3c24c94fc1d', [], {u'y': 3, u'x': 1}, {u'task': u'AddTask.get', u'group': None, u'is_eager': False, u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': 'addsub', u'exchange': ''}, u'args': [], u'headers': {}, u'correlation_id': None, u'hostname': 'celery@pcs01', u'kwargs': {u'y': 3, u'x': 1}, u'reply_to': None, u'id': u'261fb059-b88e-444b-b218-c3c24c94fc1d'}) kwargs:{})
[2014-09-13 14:18:59,425: DEBUG/MainProcess] Task accepted: AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d] pid:6536
[2014-09-13 14:18:59,425: ERROR/MainProcess] Task AddTask.get[261fb059-b88e-444b-b218-c3c24c94fc1d] raised unexpected: TypeError('get() takes exactly 3 arguments (2 given)',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
TypeError: get() takes exactly 3 arguments (2 given)

该脚本确实将正确的x: & y:参数传递给 celery worker,但self参数未正确处理。有谁明白为什么会发生这种情况?

我已经使用定义一组函数而不是具有成员函数的类的任务脚本成功地测试了上面指定的node.js脚本:

from pipelines.celery.celery import app
from pipelines.addsub import settings
from celery.utils.log import get_task_logger


log = get_task_logger(__name__)


@app.task(name='add')
def add(x, y):
    log.info("Calling task add(%d, %d)" % (x, y))
    return x + y


@app.task(name='subtract')
def subtract(x, y):
    log.info("Calling task subtract(%d, %d)" % (x, y))
    return x - y

我猜celery.contrib.methods模块在我上面描述的情况下失败了。有人对这个问题有任何见解吗?

4

0 回答 0