0

给定布局:

background \
    tasks  \
        __init__.py
        generic.py
        helpers.py
    __init__.py
    _server.py
    config.py
    router.py
    server.py

并启动_server.pycelery -A background._server worker

我得到了一个KeyError: u'generic.adder'尝试使用 a 调用generic.adder函数.delay(..)

加法器函数:

文件generic.py

from background.server import app
from background.tasks.helpers import standardized_task

@standardized_task(app, name='generic.adder')
def adder(x, y):
    return x + y

..用一个函数包装,该函数接受app实例并将 Celery Task 的输入/输出标准化为返回结果和函数的 JSON 对象。(包括在下面)但是,问题是当这个包装函数与 generic.adder 在同一个文件中时,它可以完美地工作——当它像上面那样被导入和使用时,它会抛出关键错误

我被引导相信包装器以某种方式修改了name=..传递给app.task函数名称的属性,从该函数名称helpers.py导致文字名称generic.adder的属性,导致从任务访问时找不到

同样重要的是要注意,如果您尝试adder(..)从内部打电话_server.py调用(从 celery CLI 运行的模块)它可以完美运行;只有在通过分布式接口调用时才会抛出错误;意思是,进口工作独立于芹菜。

文件helpers.py

__author__ = 'Blake'

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

def standardized_task(app, *args, **kwargs):
    def wrapped_task(fn):
        def wrapped_fn(*fnargs, **fnkwargs):
            throws = fnkwargs.get('throws', Exception)
            raises = fnkwargs.get('raises', False)

            if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
                raise ValueError('throws value not of type Exception: %s' % type(throws))

            result, error = None, None

            try:
                result = fn(*fnargs, **fnkwargs)

                if type(result) not in JSON_TYPES:
                    result = unicode(result)

            except throws, e:
                error = e

                if raises:
                    raise
            finally:
                return {
                    'result': result,
                    'error': str(error) if error else None,
                    'meta': {
                        'args': fnargs, 'kwargs': fnkwargs
                    }
                }

        return app.task(wrapped_fn, *args, **kwargs)
    return wrapped_task

文件_server.py

from background.server import app
from background.tasks.generic import *
4

1 回答 1

0

答案不是使用装饰器,而是将 celery.Task 扩展为抽象类并使用,@app.task(name='...', base=MyNewAbstractTask)

以下SO帖子更好地解释了它:

芹菜任务和自定义装饰器

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

class StandardizedTask(Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        return self.inner_run(*args, **kwargs)

    def inner_run(self, *args, **kwargs):
        throws = kwargs.get('throws', Exception)
        raises = kwargs.get('raises', False)

        if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
            raise ValueError('throws value not of type Exception: %s' % type(throws))

        result, error = None, None

        try:
            result = self.run(*args, **kwargs)

            if type(result) not in JSON_TYPES:
                result = unicode(result)

        except throws, e:
            error = e

            if raises:
                raise
        finally:
            return {
                'result': result,
                'error': str(error) if error else None,
                'meta': {
                    'args': args, 'kwargs': kwargs }}
于 2015-03-12T06:55:22.983 回答