import logging
import time
from datetime import datetime
import anyio
import numpy as np
from anyio.streams.memory import MemoryObjectReceiveStream as rstream
from anyio.streams.memory import MemoryObjectSendStream as sstream
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)
async def ping_num(send: sstream):
async with send:
for num in range(100):
await send.send(num)
async def pong_num(receive_num: rstream, send_other_num: sstream, key: int):
async with receive_num, send_other_num:
async for num in receive_num:
await send_other_num.send((key, num, np.sqrt(num)))
send_other_num.close()
async def async_sleep_5_and_print(receive_other: rstream):
async with receive_other:
async with anyio.create_task_group() as task_group:
async for other in receive_other:
task_group.start_soon(
anyio.to_thread.run_sync, sync_sleep_5_and_print, other
)
def sync_sleep_5_and_print(item):
logging.info(f"start:: {datetime.today()} {item=}")
time.sleep(5)
logging.info(f" end:: {datetime.today()} {item=}")
async def main():
send, receive = anyio.create_memory_object_stream()
send_other, receive_other = anyio.create_memory_object_stream()
async with anyio.create_task_group() as task_group:
async with send:
task_group.start_soon(ping_num, send.clone())
async with receive, send_other:
for key in range(5):
task_group.start_soon(
pong_num, receive.clone(), send_other.clone(), key
)
async with receive_other:
task_group.start_soon(async_sleep_5_and_print, receive_other.clone())
logger.info("main end")
if __name__ == "__main__":
anyio.run(main)
日志:
INFO:root:start:: 2021-11-21 14:45:48.113164 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 14:45:48.117558 item=(1, 1, 1.0)
INFO:root:start:: 2021-11-21 14:45:48.122124 item=(2, 2, 1.4142135623730951)
...
INFO:root:start:: 2021-11-21 14:45:48.149377 item=(3, 38, 6.164414002968976)
INFO:root:start:: 2021-11-21 14:45:48.154694 item=(4, 39, 6.244997998398398)
INFO:root: end:: 2021-11-21 14:45:53.115420 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 14:45:53.116359 item=(4, 99, 9.9498743710662)
...
原本预计100个任务会一起运行,5秒左右结束,结果却用了15秒。
从日志中可以看出,它似乎同时运行了多达 40 个任务。
我将后端更改为 trio,但出现了同样的问题。
为什么会这样?
有没有办法在上面的代码中解决这个问题?