我正在尝试在当前系统中实现一个redis 队列。将job
被发送到另一个模块,它应该等到工作完成并返回结果job.result
,然后继续:
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
next_step_func(job.result)
...
我在这里面临两个问题:
- 忙碌的等待,
while job.result is None
需要很长时间。我的处理时间worker_func
约为 2-3 秒,其中涉及在另一台服务器上调用 API,但繁忙的等待while job.result is None
本身又需要 >= 3 秒,总共增加了 >= 5 秒。我很肯定等待发生在执行之后,while job.result is None
因为我为worker_func
和都添加了日志while job.result is None
:
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009
正如你在上面看到的,忙等待循环发生在worker_func
完成之后。
2、有没有其他优雅的方式来实现这个同步等待而不是忙循环?我认为这里的繁忙循环绝对不是最好的实现,因为它会消耗大量的 CPU 资源。
谢谢!
-- 编辑我上面的代码以提供更清晰的上下文
我需要从调用next_step_func(job.result)
位置返回的值。job_queue.enqueue
所以更清晰的结构是:
def endpoint():
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
return next_step_func(job.result)
...
所以痛点是我需要job.result
能够返回endpoint()
,但是 Job Callback 会将我的工作带到不同的环境中on_success
。