0

我正在尝试将现有的 celery 组调用转换为和弦以防止死锁。之前的代码有重试和过期时间。我设法在没有这些设置的情况下使和弦工作,但是当我尝试应用设置时,我看不到正在运行的任务。我在文档中没有看到任何关于在整个和弦上应用相同设置的内容。我正在运行 celery 3.1.6 版。

以前的代码:

jobs = group([reset_device.s(topoid, dev_list[i], 
              waittime_list[i], skipflag) for i in range(len(dev_list))]
              ).apply_async(expires=waittime, retry=True, retry_policy={
                                                    'max_retries': 3,
                                                    'interval_start': 0.5,
                                                    'interval_step': 0.2,
                                                    'interval_max': 0.2})
results = jobs.join_native(timeout=waittime + 600, propagate=True)

工作和弦(无设置):

jobs = chord([reset_device.s(topoid, dev_list[i], 
              waittime_list[i], skipflag) for i in range(len(dev_list))])(callback)

非工作和弦#1:

jobs = chord([reset_device.s(topoid, dev_list[i], waittime_list[i],
             skipflag).set(expires=datetime.now() + timedelta(seconds=waittime)).set(retry=True).set(retry_policy=retry_policy)
              for i in range(len(dev_list))])(callback)

非工作和弦#2

jobs = chord([reset_device.subtask(args=(topoid, dev_list[i], waittime_list[i],skipflag), 
              expires=datetime.now()+timedelta(seconds=waittime), retry=True, retry_policy=retry_policy) 
             for i in range(len(dev_list))])(callback)

在 #1 和 #2 情况下,和弦中的任务似乎都没有运行。如何为和弦中调用的每个任务应用过期时间并重试?

4

1 回答 1

1

我想通了,这是一个混合的问题。

第一个问题是 expires 字段不接受整数,只接受日期时间对象,在一个和弦内(也可能是组和链),尽管文档没有做任何区分。这已在更高版本中修复,我使用 3.1.25 进行了测试,并且能够验证修复。

第二个问题是 celery 3.1.6 不会在和弦中记录错误(我也认为是组和链)。这也已修复,我在 3.1.25 中进行了测试,并且能够看到失败。

第三个问题与错误消息有关:

[2017-08-07 18:39:56,043: ERROR/Worker-5] Chord '98246849-0d5d-4be2-85e3-3fc08e90011d' raised: TaskRevokedError(u'expired',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 90, in unlock_chord
ret = j(timeout=3.0, propagate=propagate)
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 691, in join_native
raise value
TaskRevokedError: expired

这是因为时区不对。我使用了datetime.now()而不是datetime.utcnow(),它解决了这个问题并在 3.1.6 中工作。

或者,我可以设置 celery config CELERY_ENABLE_UTC = False,默认情况下设置为 True 。这让我很困惑,因为我们已将配置设置CELERY_TIMEZONE为当地时间。expires 字段与 datetime 对象一起使用时,根据CELERY_ENABLE_UTC设置的值使用本地时间或 UTC。我建议保持两个配置设置相同。

有趣的是,回调函数被创建并轮询以查看 chord 是否已完成,尽管 chord 任务从未执行并且它只是永远停留在那里。我相信这可能已在 celery 4.1 中修复

于 2017-08-08T02:03:37.223 回答