0

最近,我一直在阅读celerykombu文档,因为我需要将它们集成到我的一个项目中。我对这应该如何工作有一个基本的了解,但是使用不同经纪人的文档示例让我感到困惑。

这是场景:

在我的应用程序中,我有两个视图ViewAViewB它们都进行了一些昂贵的处理,所以我想让它们使用 celery 任务进行处理。所以这就是我所做的。

视图.py

def ViewA(request):
    tasks.do_task_a.apply_async(args=[a, b])


def ViewB(request):
    tasks.do_task_b.apply_async(args=[a, b])

任务.py

@task()
def do_task_a(a, b):
    # Do something Expensive

@task()
def do_task_b(a, b):
    # Do something Expensive here too

到目前为止,一切正常。问题是在系统上do_task_a创建了一个txt文件,我需要在do_task_b. 现在,在该do_task_b方法中,我可以检查文件是否存在,retry如果文件不存在,则调用任务方法[这是我现在正在做的]。

在这里,我宁愿采用不同的方法(即消息传递的来源)。我想在文件创建后do_task_a发送一条消息,do_task_b而不是循环重试方法,直到创建文件。

我通读了文档celerykombu更新了我的设置,如下所示。

BROKER_URL = "django://"
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = "sqlite:///celery"
TASK_RETRY_DELAY = 30 #Define Time in Seconds
DATABASE_ROUTERS = ['portal.db_routers.CeleryRouter']
CELERY_QUEUES = ( 
    Queue('filecreation', exchange=exchanges.genex, routing_key='file.create'),
)
CELERY_ROUTES = ('celeryconf.routers.CeleryTaskRouter',)

我被困在这里。不知道从这里去哪里。

接下来我应该怎么做才能在文件创建do_task_a中广播消息?do_task_b我应该怎么做才能do_task_b接收(消费)消息并进一步处理代码?

欢迎任何想法和建议。

4

1 回答 1

1

这是使用Celery 的回调/链接函数的一个很好的例子。

Celery 支持将任务链接在一起,以便一个任务跟随另一个任务。你可以在这里阅读更多关于它的信息

apply_async()函数有两个可选参数

+link : excute the linked function on success 
+link_error : excute the linked function on an error

@task
def add(a, b):
    return a + b

@task
def total(numbers):
    return sum(numbers)

@task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task %r raised exception: %r\n%r' % (exc, result.traceback))

现在在您的调用函数中执行类似的操作

def main():
    #for error_handling
    add.apply_async((2, 2), link_error=error_handler.subtask())

    #for linking 2 tasks
    add.apply_async((2, 2), link=add.subtask((8, )))
    # output 12

    #what you can do is your case is something like this. 
    if user_requires:
        add.apply_async((2, 2), link=add.subtask((8, )))
    else:
        add.apply_async((2, 2))

希望这会有所帮助

于 2013-01-04T10:50:36.750 回答