2

我试图更好地理解有关 Celery 中结果和错误的常见策略。

我看到结果有状态/状态并在请求时存储结果——我什么时候使用这些数据?错误处理和数据存储是否应该包含在任务中?

这是一个示例场景,以防它有助于更​​好地理解我的目标:

我有一个对用户地址进行 goeocodes 的地理编码任务。如果任务失败或成功,我想更新数据库中的一个字段让用户知道。(错误处理)成功后,我希望将地理编码数据插入数据库(数据存储)

应该采取什么方法?

4

1 回答 1

1

让我先说我自己仍然对 Celery 有感觉。话虽这么说,我对如何解决这个问题有一些普遍的看法,由于没有其他人回应,我会试一试。

根据您所写的内容,一个相对简单(尽管我怀疑未优化)的解决方案是遵循文档中博客评论垃圾邮件任务示例的大致轮廓。

应用模型.py

class Address(models.Model):

  GEOCODE_STATUS_CHOICES = (
    ('pr', 'pre-check'),
    ('su', 'success'), 
    ('fl', 'failed'),
  )

  address = models.TextField()
  ...
  geocode = models.TextField()
  geocode_status = models.CharField(max_length=2, 
                                    choices=GEOCODE_STATUS_CHOICES, 
                                    default='pr')

class AppUser(models.Model):
  name = models.CharField(max_length=100)
  ...
  address = models.ForeignKey(Address)

app.tasks.py

  from celery import task
  from app.models import Address, AppUser
  from some_module import geocode_function #assuming this returns a string

  @task()
  def get_geocode(appuser_pk):
    user = AppUser.objects.get(pk=appuser_pk)
    address = user.address

    try:
      result = geocode_function(address.address)
      address.geocode = result
      address.geocode_status = 'su' #set address object as successful
      address.save()
      return address.geocode  #this is optional -- your task doesn't have to return anything
                                 on the other hand, you could also choose to decouple the geo-
                                 code function from the database update for the object instance.   
                                 Also, if you're thinking about chaining tasks together, you             
                                 might think about if it's advantageous to pass a parameter as 
                                 an input or partial input into the child task.

      except Exception as e:     
        address.geocode_status = 'fl' #address object fails
        address.save()
        #do something_else()
        raise  #re-raise the error, in case you want to trigger retries, etc

应用程序.views.py

from app.tasks import *
from app.models import *
from django.shortcuts import get_object_or_404

    def geocode_for_address(request, app_user_pk):
      app_user = get_object_or_404(AppUser, pk=app_user_pk)

     ...etc.etc.  --- **somewhere calling your tasks with appropriate args/kwargs

我相信这符合您上面概述的最低要求。我故意让视图未开发,因为我不知道您到底想如何触发它。当他们的地址无法进行地理编码时,听起来您可能还需要某种用户通知(“我想更新数据库中的一个字段让用户知道”)。在不了解此要求的具体细节的情况下,我认为这听起来可能最好在您的 html 模板中完成(如果 instance.attribute 值为 X,则在模板中显示 q)或使用 django.signals(设置当 user.address.geocode_status 切换到失败时发出信号——例如,通过电子邮件通知用户等)。

在对上面代码的注释中,我提到了解耦和链接上面 get_geocode 任务的组成部分的可能性。您还可以考虑通过编写自定义错误处理程序任务并使用link_error 参数(例如 add.apply_async((2, 2)、link_error=error_handler.s()、其中error_handler已被定义为 app.tasks.py 中的任务)。此外,无论您选择通过主任务(get_geocode)还是通过链接的错误处理程序来处理错误,我认为您会想要更具体关于如何处理不同类型的错误(例如,对连接错误做一些不同于地址数据格式不正确的事情)。

我怀疑有更好的方法,而且我刚刚开始了解通过链接任务、使用组和和弦等可以获得的创造力。希望这至少可以帮助你思考一些可能性。我会把它留给其他人推荐最佳实践。

于 2012-12-27T23:21:28.857 回答