0

我正在努力将以前同步的网络服务器作为同步服务器。我的大部分功能都是同步的,我想简单地从现有代码中进行异步调用以避免异步蠕变。nest_asyncio 似乎通过使 run_until_complete 可重入来允许这样做。

但是,虽然这适用于 1 个可重入调用,但我遇到了两个死锁:

import asyncio
import functools
import time
from random import random
import nest_asyncio
nest_asyncio.apply()

def sync(corot, loop=None):
    """
    Make a synchronous function from an asynchronous one.
    :param corot:
    :return:
    """
    if loop is None:
        loop = asyncio.get_event_loop()
    result, = loop.run_until_complete(asyncio.gather(corot))
    return result

async def sync_to_corountine(func, *args, **kw):
    """
    Make a coroutine from a synchronous function.
    """
    try:
        return func(*args, *kw)
    finally:
        # every async needs an await.
        await asyncio.sleep(0)




def main():
    async def background(timeout):
        await asyncio.sleep(timeout)
        print(f"Background: {timeout}")

    loop = asyncio.get_event_loop()
    # Run some background work to check we are never blocked
    bg_tasks = [
        loop.create_task(background(i))
        for i in range(10)
    ]



    async def long_running_async_task(result):
        # Simulate slow IO
        print(f"...START long_running_async_task [{result}]")
        await asyncio.sleep(1)
        print(f"...END   long_running_async_task [{result}]")
        return result

    def sync_function_with_async_dependency(result):
        print(f"...START sync_function_with_async_dependency [{result}]")
        result = sync(long_running_async_task(result), loop=loop)
        print(f"...END   sync_function_with_async_dependency [{result}]")
        return result

    # Call sync_function_with_async_dependency
    # One reentrant task is OK
    # Multiple reentrant tasks=>fails to exit
    n = 2
    for i in range(n):
        bg_tasks.append(sync_to_corountine(sync_function_with_async_dependency, i))
    # for i in range(n):
    #     bg_tasks.append(long_running_async_task(i))

    # OK
    # bg_tasks.append(long_running_async_task(123))
    # bg_tasks.append(long_running_async_task(456))

    task = asyncio.gather(*bg_tasks)  # , loop=loop)
    loop.run_until_complete(task)

if __name__ == '__main__':
    main()

...START sync_function_with_async_dependency [0]
...START sync_function_with_async_dependency [1]
Background: 0
...START long_running_async_task [0]
...START long_running_async_task [1]
Background: 1
...END   long_running_async_task [0]
...END   long_running_async_task [1]
...END   sync_function_with_async_dependency [1]
Background: 2
Background: 3
Background: 4

...我们失踪了

...END   sync_function_with_async_dependency [0]

我是否正确使用了nest_asyncio?

4

1 回答 1

1

如果要将异步调用嵌套在同步调用中,则需要使用 gevent,否则会导致饥饿问题。

asyncio 的整个设计是异步例程调用异步例程,但同步例程不能(通过运行除外)。他们这样做是因为他们觉得其他任何事情都会导致代码过于复杂,难以理解。可能 asyncio 中的错误也更难修复。

猴子补丁nest_async 试图打破这个设计原则,但它并没有很好地完成它,因为嵌套运行不会给在嵌套运行之外安排的任务时间。如果您在嵌套运行中的代码上花费太多时间,这可能会导致饥饿。

我试图向作者解释这一点,但他仍在苦苦挣扎:

https://github.com/erdewit/nest_asyncio/issues/36

于 2020-10-20T16:57:24.000 回答