0

所以我有一个耗时的函数调用my_heavy_function,我需要将该输出重定向到调用它的 Web 界面,我有一个向 Web 界面发送消息的方法,让我们调用那个方法async push_message_to_user()

基本上,它就像

import time 

def my_heavy_function():
    time_on = 0 
    for i in range(20):
        time.sleep(1)
        print(f'time spend {time_on}')
        time_on = time_on+1

async def push_message_to_user(message:str):
    # some lib implementation 
    pass

if __name__ == "__main__":
    my_heavy_function() # how push prints to the UI ?
    

也许有一种方法可以提供my_heavy_function(stdout_obj) 并使用“std_object”(StringIO)来做类似的事情stdout_object.push(f'time spend {time_on}')。我可以这样做,但是我无法my_heavy_function()通过异步版本更改push_message_to_user()直接添加而不是添加print(其他非 ascyn 例程使用它)

我想要的是什么(伪代码)

with contextlib.redirect_output(my_prints):
    my_heavy_function()
    while my_prints.readable():
        # realtime push
        await push_message_to_user(my_prints)

谢谢!

4

1 回答 1

0

感谢您的评论,@HTF我终于设法解决了janus的问题。我复制了 repo 的示例,并进行了修改以接收可变数量的消息(因为我不知道my_heavy_function()将使用多少次迭代)

import asyncio
import janus
import time 

def my_heavy_function(sync_q):
    for i in range(10):
        sync_q.put(i)
        time.sleep(1)
    sync_q.put('end')    # is there a more elegant way to do this ?
    sync_q.join()


async def async_coro(async_q):
    while True:
        val = await async_q.get()
        print(f'arrived {val}')
        # send val to UI 
        # await push_message_to_user(val)
        async_q.task_done()
        if val == 'end': 
            break

async def main():
    queue = janus.Queue()
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(None, my_heavy_function, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()


asyncio.run(main())
于 2021-04-29T00:12:20.947 回答