7

1) 我目前正在开发一个 Web 应用程序,该应用程序公开一个 REST api 并使用 Django 和 Celery 来处理请求并解决它们。对于要解决的请求,必须将一组 celery 任务提交到 amqp 队列,以便它们在工作人员(位于其他机器上)上执行。每个任务都占用大量 CPU 资源,并且需要很长时间(数小时)才能完成。

我已将 Celery 配置为也使用 amqp 作为结果后端,并且我使用 RabbitMQ 作为 Celery 的代理。

每个任务都会返回一个结果,该结果需要随后存储在数据库中,而不是由工作人员直接存储。只有“中央节点”——运行 django-celery 并在 RabbitMQ 队列中发布任务的机器——可以访问这个存储数据库,所以工作人员的结果必须以某种方式在这台机器上返回。

问题是之后如何处理任务执行的结果?因此,在工作人员完成后,它的结果将存储在配置的结果后端(amqp)中,但现在我不知道从那里获取结果并处理它们的最佳方法是什么。

我在文档中可以找到的只是您可以不时检查结果的状态:

result.state

这意味着基本上我需要一段专门的代码来定期运行这个命令,因此只用这个来保持整个线程/进程的忙碌,或者用以下方法阻止所有内容:

result.get()

直到任务完成,这不是我想要的。

我能想到的唯一解决方案是在“中央节点”上有一个额外的线程,该线程定期运行一个函数,该函数基本上检查每个任务在提交时返回的 async_results,并在任务具有完成状态时采取行动。

有没有人有任何其他建议?

此外,由于后端结果的处理发生在“中央节点”上,我的目标是尽量减少此操作对这台机器的影响。

最好的方法是什么?

2)人们通常如何解决处理worker返回的结果并放入backend-results的问题?(假设已经配置了后端结果)

4

1 回答 1

2

我不确定我是否完全理解你的问题,但考虑到每个任务都有一个任务 ID。如果用户正在发送任务,您可以存储 ID,然后使用 json 检查结果,如下所示:

#urls.py 
from djcelery.views import is_task_successful

urlpatterns += patterns('',
    url(r'(?P<task_id>[\w\d\-\.]+)/done/?$', is_task_successful,
        name='celery-is_task_successful'),
    )

其他相关概念是信号的概念,每个完成的任务都会发出一个信号。一个完成的任务会发出一个 task_success 信号。更多可以在实时过程中找到。

于 2013-02-17T10:51:01.180 回答