20

这是我的设置:

  • django 1.3
  • 芹菜 2.2.6
  • Django 芹菜 2.2.4
  • djkombu 0.9.2

在我的 settings.py 文件中,我有

BROKER_BACKEND = "djkombu.transport.DatabaseTransport"

即我只是使用数据库来排队任务。

现在谈谈我的问题:我有一个用户启动的任务,可能需要几分钟才能完成。我希望每个用户只运行一次任务,并且我会将任务的结果缓存在一个临时文件中,所以如果用户再次启动任务,我只返回缓存的文件。我的视图函数中有如下代码:

task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)

if result.state == celery.states.PENDING:
    # The next line makes a duplicate task if the user rapidly refreshes the page
    tasks.some_long_task.apply_async(task_id=task_id)
    return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
    return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
    if cached_file_still_exists():
        return get_cached_file()
    else:
        result.forget()
        tasks.some_long_task.apply_async(task_id=task_id)
        return HttpResponse("Task started...")

这段代码几乎可以工作。但是当用户快速重新加载页面时,我遇到了问题。从任务排队到任务最终从队列中拉出并交给工作人员之间有 1-3 秒的延迟。在此期间,任务的状态保持为 PENDING,这会导致视图逻辑启动重复任务。

我需要的是某种方式来判断任务是否已经提交到队列中,所以我最终不会提交两次。在芹菜中有这样做的标准方法吗?

4

3 回答 3

5

I don't think (as Tomek and other have suggested) that using the database is the way to do this locking. django has built-in cache framework, which should be sufficient to accomplish this locking, and must faster. See:

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial

Django can be configured to use memcached as its cache backend, and this can be distributed across multiple machines ... this seems better to me. Thoughts?

于 2013-07-05T03:53:42.967 回答
5

我用 Redis 解决了这个问题。只需在 redis 中为每个任务设置一个键,然后在任务的 after_return 方法中从 redis 中删除该键。Redis 是轻量级和快速的。

于 2011-11-09T14:18:07.483 回答
1

您可以通过将结果手动存储在数据库中来作弊。让我解释一下这将如何提供帮助。

例如,如果使用 RDBMS(带有列的表 - task_id、state、result):

查看部分:

  1. 使用事务管理。
  2. 使用 SELECT FOR UPDATE 获取 task_id == "long-task-%d" % user_id 的行。SELECT FOR UPDATE 将阻止其他请求,直到这个 COMMITs 或 ROLLBACKs。
  3. 如果它不存在 - 将状态设置为 PENDING 并启动“some_long_task”,结束请求。
  4. 如果状态为 PENDING - 通知用户。
  5. 如果状态为 SUCCESS - 将状态设置为 PENDING,启动任务,返回 'result' 列指向的文件。我基于这样的假设,即您希望在获得结果后重新运行任务。犯罪
  6. 如果状态为 ERROR - 将状态设置为 PENDING,启动任务,通知用户。犯罪

任务部分:

  1. 准备文件,包装在 try,catch 块中。
  2. 成功时 - 使用 state = SUCCESS, result 更新正确的行。
  3. 失败时 - 使用 state = ERROR 更新正确的行。
于 2011-06-29T22:16:49.020 回答