0
import logging
import time
from datetime import datetime

import anyio
import numpy as np
from anyio.streams.memory import MemoryObjectReceiveStream as rstream
from anyio.streams.memory import MemoryObjectSendStream as sstream

logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)


async def ping_num(send: sstream):
    async with send:
        for num in range(100):
            await send.send(num)


async def pong_num(receive_num: rstream, send_other_num: sstream, key: int):
    async with receive_num, send_other_num:
        async for num in receive_num:
            await send_other_num.send((key, num, np.sqrt(num)))
        send_other_num.close()


async def async_sleep_5_and_print(receive_other: rstream):
    async with receive_other:
        async with anyio.create_task_group() as task_group:
            async for other in receive_other:
                task_group.start_soon(
                    anyio.to_thread.run_sync, sync_sleep_5_and_print, other
                )


def sync_sleep_5_and_print(item):
    logging.info(f"start:: {datetime.today()} {item=}")
    time.sleep(5)
    logging.info(f"  end:: {datetime.today()} {item=}")


async def main():
    send, receive = anyio.create_memory_object_stream()
    send_other, receive_other = anyio.create_memory_object_stream()
    async with anyio.create_task_group() as task_group:
        async with send:
            task_group.start_soon(ping_num, send.clone())
        async with receive, send_other:
            for key in range(5):
                task_group.start_soon(
                    pong_num, receive.clone(), send_other.clone(), key
                )
        async with receive_other:
            task_group.start_soon(async_sleep_5_and_print, receive_other.clone())
    logger.info("main end")


if __name__ == "__main__":
    anyio.run(main)

日志:

INFO:root:start:: 2021-11-21 14:45:48.113164 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 14:45:48.117558 item=(1, 1, 1.0)
INFO:root:start:: 2021-11-21 14:45:48.122124 item=(2, 2, 1.4142135623730951)

...

INFO:root:start:: 2021-11-21 14:45:48.149377 item=(3, 38, 6.164414002968976)
INFO:root:start:: 2021-11-21 14:45:48.154694 item=(4, 39, 6.244997998398398)
INFO:root:  end:: 2021-11-21 14:45:53.115420 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 14:45:53.116359 item=(4, 99, 9.9498743710662)

...

原本预计100个任务会一起运行,5秒左右结束,结果却用了15秒。

从日志中可以看出,它似乎同时运行了多达 40 个任务。

我将后端更改为 trio,但出现了同样的问题。

为什么会这样?

有没有办法在上面的代码中解决这个问题?

4

1 回答 1

0

当我进一步了解anyio.to_thread.run_sync时,我发现它正在接收一个名为 limiter 的关键字参数。

如果它接收到 None 值,它使用默认限制器。

据推测,这个基本限制器被指定为 40。

所以我添加并修改了以下代码来实现我想要的操作。

可能有人和我有同样的问题,所以我把它作为答案。

from functools import partial

async def async_sleep_5_and_print(receive_other: rstream):
    limit = anyio.CapacityLimiter(100)
    run_sync_with_limit = partial(anyio.to_thread.run_sync, limiter=limit)
    async with receive_other:
        async with anyio.create_task_group() as task_group:
            async for other in receive_other:
                task_group.start_soon(
                    run_sync_with_limit, sync_sleep_5_and_print, other
                )

日志:

INFO:root:start:: 2021-11-21 21:40:24.235837 item=(0, 0, 0.0)
INFO:root:start:: 2021-11-21 21:40:24.237306 item=(1, 1, 1.0)
INFO:root:start:: 2021-11-21 21:40:24.237800 item=(2, 2, 1.4142135623730951)
INFO:root:start:: 2021-11-21 21:40:24.238302 item=(3, 3, 1.7320508075688772)
INFO:root:start:: 2021-11-21 21:40:24.238695 item=(4, 4, 2.0)

...

INFO:root:  end:: 2021-11-21 21:40:29.302220 item=(4, 89, 9.433981132056603)
INFO:root:  end:: 2021-11-21 21:40:29.302260 item=(4, 94, 9.695359714832659)
INFO:root:  end:: 2021-11-21 21:40:29.303010 item=(0, 95, 9.746794344808963)
INFO:root:  end:: 2021-11-21 21:40:29.303086 item=(2, 97, 9.848857801796104)
INFO:__main__:main end

我发现它anyio._backends_._asyncio.current_default_thread_limiter

于 2021-11-21T12:46:03.887 回答