给定布局:
background \
tasks \
__init__.py
generic.py
helpers.py
__init__.py
_server.py
config.py
router.py
server.py
并启动_server.py
并celery -A background._server worker
我得到了一个KeyError: u'generic.adder'
尝试使用 a 调用generic.adder
函数.delay(..)
加法器函数:
文件generic.py
from background.server import app
from background.tasks.helpers import standardized_task
@standardized_task(app, name='generic.adder')
def adder(x, y):
return x + y
..用一个函数包装,该函数接受app
实例并将 Celery Task 的输入/输出标准化为返回结果和函数的 JSON 对象。(包括在下面)但是,问题是当这个包装函数与 generic.adder 在同一个文件中时,它可以完美地工作——当它像上面那样被导入和使用时,它会抛出关键错误。
我被引导相信包装器以某种方式修改了name=..
传递给app.task
函数名称的属性,从该函数名称helpers.py
导致文字名称generic.adder
的属性,导致从任务访问时找不到
同样重要的是要注意,如果您尝试adder(..)
从内部打电话_server.py
调用(从 celery CLI 运行的模块)它可以完美运行;只有在通过分布式接口调用时才会抛出错误;意思是,进口工作独立于芹菜。
文件helpers.py
__author__ = 'Blake'
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
def standardized_task(app, *args, **kwargs):
def wrapped_task(fn):
def wrapped_fn(*fnargs, **fnkwargs):
throws = fnkwargs.get('throws', Exception)
raises = fnkwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = fn(*fnargs, **fnkwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': fnargs, 'kwargs': fnkwargs
}
}
return app.task(wrapped_fn, *args, **kwargs)
return wrapped_task
文件_server.py
from background.server import app
from background.tasks.generic import *