5

我正在运行一个 Django 网站并且刚刚让 Celery 运行,但我遇到了令人困惑的错误。以下是代码的结构。

在测试.py 中:

from tasks import *
from celery.result import AsyncResult

project = Project.objects.create()
# initalize various sub-objects of the project

c = function.delay(project.id)
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
# wait until the task is done  
while not r and not f:
    r = AsyncResult(c.id).ready()
    f = AsyncResult(c.id).failed()

self.assertEqual() #will fail because task fails

在tasks.py中:

from __future__ import absolute_import
from celery import shared_task

@shared_task
def function(project_id)
    #a bunch of calculations followed by a save of the project
    project = Project.objects.get(project=project_id)

    for part in project.part_set.all():
        partFunction(part.id)
        p = Part.objects.get(id=part.id)
        # add to various variables in project from variables in p
    project.save()

在 mainapp/settings.py 中:

BROKER_URL = "amqp://ipaddress"
CELERY_RESULT_BACKEND='amqp'
CELERY_ACCEPT_CONTENT = ['json','pickle','msgpack','yaml']
CELERY_IGNORE_RESULT = False

芹菜调试控制台日志必须按列表/元组:

[INFO/MainProcess] Received task: myapp.tasks.function[id]
[ERROR/MainProcess] Task myapp.tasks.function[id]
    raised unexpected: ValueError('task args must be a list or tuple',)
Traceback:
   File "/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
       R = retval = fun(*args, **kwargs)
   File "/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
       return self.run(*args, **kwargs)
   File "/myapp/tasks.py", line 28, in function
       p = Part.objects.get(id=part.id)
   File "/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
       **dict(self._get_exec_options(), **options)
   File "/python2.7/site-packages/celery/app/base.py", line 351, in send_task
       reply_to=reply_to or self.oid, **options
   File "celery/app/amqp.py", line 252, in publish_task
       raise ValueError('task args must be a list or tuple')
ValueError: task args must be a list or tuple

我得到的错误如上所述,AsyncResult(c.id).result: task args must be a list or tuple。这应该是一个简单的解决方案,但事实并非如此。当我把它列成这样的列表时:

inline = [project.id]
c = function.delay(inline)

然后它改变主意并告诉我AsyncResult(c.id).result: int() argument must be a string or a number, not 'list'

正如你可以想象的那样,我对可能是什么问题感到非常困惑。


编辑

任务.py

@shared_task
def function(app):
    @app.task(name='myapp.function', bind=True)
    def function(project_id):

测试.py

c = function.s(project.id).delay()

function.app 打印

4

1 回答 1

8

您在任务中的代码中遇到错误,它显示在回溯中:

File "/myapp/tasks.py", line 28, in function
   p = Part.objects.get(id=part.id)

您的代码似乎是正确的,但从追溯来看,芹菜似乎有一个旧版本的腌制任务。非常重要的是,每当您更改内部的任何内容时都重新启动 celery task.py(也许即使您更改了其他文件,但我不这么认为)。这可能是你的问题的原因,它咬了我几次。

此外,没有理由part单独从数据库中提取,p = Part.objects.get(id=part.id)因为当您在for part in project.part_set.all():. 这只是一个建议,您可能有更多需要该步骤的代码。

作为旁注,如果它是您项目中的一项任务,而不是某些可重用应用程序的一部分,只需使用常规@task装饰器,芹菜会找到它,但请确保您Celery正确配置应用程序,这是我的另一篇文章,可以指导您完成:Celery / Django Single Tasks 多次运行

因此,只要您正确配置了所有内容,就可以像以前一样使用它:

@task #or @shared_task
def function(project_id)
    #a bunch of calculations followed by a save of the project
    project = Project.objects.get(project=project_id)
    ....

然后调用它:

result = function.delay(project.id)

或者:

result = function.apply_async(args=(project.id,))

显然,我还建议直接通过不带 celery 调用它来测试任务function(project.id),但我相信你知道这一点。

于 2014-06-18T21:35:25.910 回答