我有 celery 任务,分组在一起,当组中的一个任务达到最大重试次数时会挂起。
我只是在任务主体中提出异常(即解包和验证),因为我仍在试验任务的重试功能。
这是代码:
@app.task(name='job', bind=True, queue='job')
def process_job(self, job_type, params):
res = None
ret = {"job_status":"SUCCESS", "job_message":"job_process_done"}
res = group(unpack.s(params["s3_location"]),
validate.s(params["s3_location"]))()
return res.get()
@app.task(name='unpack', bind=True, queue='unpack', max_retries=3,
default_retry_delay=1*10)
def unpack(self, s3_location):
ret = {"unpack_status":"SUCCESS", "unpack_message":"unpack_done"}
try:
raise Exception("")
except Exception, ex:
if unpack.request.retries == unpack.max_retries:
raise Exception('UNPACK: Max Retries Reached')
else: unpack.retry(args=[s3_location])
return ret
@app.task(name='validate', bind=True, queue='validate', max_retries=4,
default_retry_delay=1*10)
def validate(self, s3_location):
ret = {"validate_status":"SUCCESS",
"validate_message":"validate_done"}
try:
raise Exception("")
except Exception, ex:
if validate.request.retries == validate.max_retries:
raise Exception('VALIDATE: Max Retries Reached')
else: validate.retry(args=[s3_location])
return ret
当达到验证任务的最大重试次数时,其他任务(解包)以及父任务(process_job)都没有完成。也就是说,它们的状态分别被困在“RETRY”和“STARTED”。以下是快照图像:
1.花显示工作状态
2. 终端显示已达到最大重试次数的 Celery worker
3.验证花的详细状态
4.花中解包详细状态
如何解决未完成任务的问题?我在任务代码/实现中做错了吗?