最近,我一直在阅读celery和kombu文档,因为我需要将它们集成到我的一个项目中。我对这应该如何工作有一个基本的了解,但是使用不同经纪人的文档示例让我感到困惑。
这是场景:
在我的应用程序中,我有两个视图ViewA
,ViewB
它们都进行了一些昂贵的处理,所以我想让它们使用 celery 任务进行处理。所以这就是我所做的。
视图.py
def ViewA(request):
tasks.do_task_a.apply_async(args=[a, b])
def ViewB(request):
tasks.do_task_b.apply_async(args=[a, b])
任务.py
@task()
def do_task_a(a, b):
# Do something Expensive
@task()
def do_task_b(a, b):
# Do something Expensive here too
到目前为止,一切正常。问题是在系统上do_task_a
创建了一个txt
文件,我需要在do_task_b
. 现在,在该do_task_b
方法中,我可以检查文件是否存在,retry
如果文件不存在,则调用任务方法[这是我现在正在做的]。
在这里,我宁愿采用不同的方法(即消息传递的来源)。我想在文件创建后do_task_a
发送一条消息,do_task_b
而不是循环重试方法,直到创建文件。
我通读了文档celery
并kombu
更新了我的设置,如下所示。
BROKER_URL = "django://"
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = "sqlite:///celery"
TASK_RETRY_DELAY = 30 #Define Time in Seconds
DATABASE_ROUTERS = ['portal.db_routers.CeleryRouter']
CELERY_QUEUES = (
Queue('filecreation', exchange=exchanges.genex, routing_key='file.create'),
)
CELERY_ROUTES = ('celeryconf.routers.CeleryTaskRouter',)
我被困在这里。不知道从这里去哪里。
接下来我应该怎么做才能在文件创建do_task_a
中广播消息?do_task_b
我应该怎么做才能do_task_b
接收(消费)消息并进一步处理代码?
欢迎任何想法和建议。