0

我有 celery 任务,分组在一起,当组中的一个任务达到最大重试次数时会挂起。

我只是在任务主体中提出异常(即解包和验证),因为我仍在试验任务的重试功能。

这是代码:

@app.task(name='job', bind=True, queue='job')
def process_job(self, job_type, params):
    res = None
    ret = {"job_status":"SUCCESS", "job_message":"job_process_done"}
    res = group(unpack.s(params["s3_location"]), 
          validate.s(params["s3_location"]))()
    return res.get()

@app.task(name='unpack', bind=True, queue='unpack', max_retries=3, 
          default_retry_delay=1*10)
def unpack(self, s3_location):
    ret = {"unpack_status":"SUCCESS", "unpack_message":"unpack_done"}
    try:
        raise Exception("")
    except Exception, ex:
        if unpack.request.retries == unpack.max_retries:
            raise Exception('UNPACK: Max Retries Reached')
        else: unpack.retry(args=[s3_location])
    return ret

@app.task(name='validate', bind=True, queue='validate', max_retries=4, 
          default_retry_delay=1*10)
def validate(self, s3_location):
    ret = {"validate_status":"SUCCESS", 
           "validate_message":"validate_done"}
    try:
        raise Exception("")
    except Exception, ex:
        if validate.request.retries == validate.max_retries:
            raise Exception('VALIDATE: Max Retries Reached')
        else: validate.retry(args=[s3_location])
    return ret

当达到验证任务的最大重试次数时,其他任务(解包)以及父任务(process_job)都没有完成。也就是说,它们的状态分别被困在“RETRY”和“STARTED”。以下是快照图像:

1.花显示工作状态

在此处输入图像描述

2. 终端显示已达到最大重试次数的 Celery worker

在此处输入图像描述

3.验证花的详细状态

在此处输入图像描述

4.花中解包详细状态

在此处输入图像描述

如何解决未完成任务的问题?我在任务代码/实现中做错了吗?

4

1 回答 1

0

对于那些遇到同样问题的人,我可以通过将每个任务的 max_retries 间隔设置为至少 2 来解决这个问题(即 max_retries=3 用于解包,max_retries=5 用于验证)。通过这样做,另一个任务的重试尝试将被执行并在 try-catch 块中捕获。

希望这可以帮助!

于 2018-02-04T15:50:28.520 回答