0

我正在尝试在当前系统中实现一个redis 队列。job被发送到另一个模块,它应该等到工作完成并返回结果job.result,然后继续:

with Connection(redis_connection):
    job = job_queue.enqueue(worker_func, func_input1, func_input2)

print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
    pass
print(datetime.datetime.now())
print("got result")

# next step
next_step_func(job.result)

...

我在这里面临两个问题:

  1. 忙碌的等待,while job.result is None需要很长时间。我的处理时间worker_func约为 2-3 秒,其中涉及在另一台服务器上调用 API,但繁忙的等待while job.result is None本身又需要 >= 3 秒,总共增加了 >= 5 秒。我很肯定等待发生在执行之后,while job.result is None因为我为worker_func和都添加了日志while job.result is None
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009

正如你在上面看到的,忙等待循环发生在worker_func完成之后。

2、有没有其他优雅的方式来实现这个同步等待而不是忙循环?我认为这里的繁忙循环绝对不是最好的实现,因为它会消耗大量的 CPU 资源。

谢谢!

-- 编辑我上面的代码以提供更清晰的上下文

我需要从调用next_step_func(job.result)位置返回的值。job_queue.enqueue所以更清晰的结构是:

def endpoint():
    with Connection(redis_connection):
        job = job_queue.enqueue(worker_func, func_input1, func_input2)

    print("waiting for result")
    print(datetime.datetime.now())
    while job.result is None:
        pass
    print(datetime.datetime.now())
    print("got result")

    # next step
    return next_step_func(job.result)

...

所以痛点是我需要job.result能够返回endpoint(),但是 Job Callback 会将我的工作带到不同的环境中on_success

4

1 回答 1

1

文档建议使用作业回调作为选项:

def job_succeeded(job, connection, result, *args, **kwargs):
    next_step_func(job.result)

def job_failed(job, connection, type, value, traceback):
    # react to the error
    pass

with Connection(redis_connection):
    args = (func_input1, func_input2)
    job_queue.enqueue(worker_func, args=args, on_success=job_succeeded, on_failure=job_failed)
于 2021-07-12T11:35:59.303 回答