0

我的下一个项目需要一个调度程序,因为我使用 Django 进行编码,所以我选择了Celery

我正在寻找一种任务在完成时告诉 Django 的方法,这样我就可以更新数据库并使用SSE来告诉用户。只需将所有逻辑放入任务中,所有这些都可以相当简单地完成。但是当我计划有几个芹菜工人时我该怎么办?

我在网上找到了一堆关于单工案例的信息,但如果你有多个工人,则没有多少能涵盖这个问题。

我想到的是使用从工作人员到网络服务器的 http 回调来让它知道任务已经完成。看着celery.task.http看起来很有希望,但没有做我需要的。

是使用信号和连接手动 http 调用的解决方案吗?还是我走错了路?这不是一个普遍的问题吗?如何更优雅地解决这个问题?

4

2 回答 2

1

那么,当你告诉 Django 时,你是什么意思?我理解你的意思吗,初始化芹菜任务的django请求,在这个任务完成的时候还活着吗?在这种情况下,您可以检查一些存储(数据库、memcached 等)。并发送您的 SSE。看,有一种方法可以做到这一点。1. 你 django 查看发送任务到 Celery,之后它进入无限循环(或超时 60 秒的循环?)并在 memcached 中等待结果。

  1. Celery 获取任务执行,并将结果粘贴到 memcached。

  2. Django 视图获得新结果,退出循环并发送您的 SSE。

下一个变体是

  1. Django 视图向 Celery 发送任务,然后返回

  2. Celery 执行任务,执行后向您的 django 应用程序发出简单的 HTTP 请求。

  3. Django 接收来自 Celery 的 http 请求,解析参数并再次将 SSE 发送给您的用户

于 2013-08-13T11:36:52.357 回答
0

这是一些似乎可以满足我要求的代码:

在 Django 设置中:

CELERY_ANNOTATIONS = {
    "*": {
        "on_failure": celery_handlers.on_failure,
        "on_success": celery_handlers.on_success
    }
}

在 celery_handlers.py 文件中包括:

def on_failure(self, exc, task_id, *args, **kwargs):
    # Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
    pass

def on_success(self, retval, task_id, *args, **kwargs):
    # Use urllib or similar to poke eg; api-int.mysite.com/task_handler/TASK_ID
    pass

然后你可以设置 api-int 来使用类似的东西:

from celery.result import AsyncResult
task_obj = AsyncResult(task_id)
# Logic to handle task_obj.result and related goes here....
于 2013-08-13T12:50:39.383 回答