所以我有一个耗时的函数调用my_heavy_function
,我需要将该输出重定向到调用它的 Web 界面,我有一个向 Web 界面发送消息的方法,让我们调用那个方法async push_message_to_user()
。
基本上,它就像
import time
def my_heavy_function():
time_on = 0
for i in range(20):
time.sleep(1)
print(f'time spend {time_on}')
time_on = time_on+1
async def push_message_to_user(message:str):
# some lib implementation
pass
if __name__ == "__main__":
my_heavy_function() # how push prints to the UI ?
也许有一种方法可以提供my_heavy_function(stdout_obj)
并使用“std_object”(StringIO)来做类似的事情stdout_object.push(f'time spend {time_on}')
。我可以这样做,但是我无法my_heavy_function()
通过异步版本更改push_message_to_user()
直接添加而不是添加print
(其他非 ascyn 例程使用它)
我想要的是什么(伪代码)
with contextlib.redirect_output(my_prints):
my_heavy_function()
while my_prints.readable():
# realtime push
await push_message_to_user(my_prints)
谢谢!