11

我们在 WSGI 应用程序中使用RQ。我们所做的是在运行任务的不同后端服务器中有几个不同的进程,连接到(可能)几个不同的任务服务器。为了更好地配置这个设置,我们在系统中使用了一个自定义管理层,它负责运行工作人员、设置任务队列等。

当作业失败时,我们希望实现重试,在延迟增加后重试作业几次,最终要么完成它,要么让它失败并在我们的日志系统中记录一个错误条目。但是,我不确定应该如何实施。我已经创建了一个自定义工作脚本,它允许我们将错误记录到我们的数据库中,我第一次重试的尝试是这样的:

# This handler would ideally wait some time, then requeue the job.
def worker_retry_handler(job, exc_type, exc_value, tb):
    print 'Doing retry handler.'
    current_retry = job.meta[attr.retry] or 2

    if current_retry >= 129600:
        log_error_message('Job catastrophic failure.', ...)
    else:
        current_retry *= 2

        log_retry_notification(current_retry)
        job.meta[attr.retry] = current_retry
        job.save()
        time.sleep(current_retry)

        job.perform()

return False

正如我所提到的,我们在工作文件中也有一个函数,它可以正确解析它应该连接的服务器,并可以发布作业。问题不一定是如何发布作业,而是如何处理您在异常处理程序中获得的作业实例。

任何帮助将不胜感激。如果有关于更好的方法的建议或指示,这也很好。谢谢!

4

2 回答 2

1

我看到两个可能的问题:

  1. 你应该有一个返回值。False 可防止作业发生默认异常处理(请参阅本页最后一部分:http: //python-rq.org/docs/exceptions/

  2. 我认为当您的处理程序被调用时,该作业不再排队。我不是 100% 肯定的(特别是考虑到我上面提到的文档),但如果它在失败的队列中,你可以调用 requeue_job(job.id) 重试它。如果不是(听起来好像不是),您可能可以获取正确的队列并直接加入队列。

于 2013-01-18T16:11:02.780 回答
0

强文本我有一个更漂亮的解决方案

from rq import Queue, Worker
from redis import Redis

redis_conn = Redis(host=REDIS_HOST, health_check_interval=30)
queues = [
    Queue(queue_name, connection=redis_conn, result_ttl=0) 
    for queue_name in ["Low", "Fast"]
]
worker = Worker(queues, connection=redis, exception_handlers=[retry_handler])


def retry_handler(job, exc_type, exception, traceback):
    if isinstance(exception, RetryException):
        sleep(RetryException.sleep_time)
        job.requeue()
        return False

处理程序本身负责决定异常处理是否完成,或者是否应该落入堆栈中的下一个处理程序。处理程序可以通过返回一个布尔值来表明这一点。False表示停止处理异常,True表示继续并下降到堆栈上的下一个异常处理程序。

重要的是要知道,默认情况下,当处理程序没有明确的返回值(因此None)时,这将被解释为True(即继续下一个处理程序)。

要防止处理程序链中的下一个异常处理程序执行,请使用不会失败的自定义异常处理程序,例如:

于 2020-09-23T16:06:34.063 回答