37

asyncio 的文档提供了两个示例,说明如何每两秒打印一次“Hello World”: https ://docs.python.org/3/library/asyncio-eventloop.html#asyncio-hello-world-callback https:// docs.python.org/3/library/asyncio-task.html#asyncio-hello-world-coroutine

我可以从解释器中运行它们,但如果我这样做了,我将无法访问解释器。可以在后台运行异步事件循环,以便我可以继续在解释器中输入命令吗?

4

2 回答 2

60

编辑:

如果使用 Python 3.8 或更高版本,则应使用asynciorepl,如zeronone 的答案中所述。如果使用 3.7 或更低版本,您可以使用此答案。


您可以在后台线程中运行事件循环:

>>> import asyncio
>>> 
>>> @asyncio.coroutine
... def greet_every_two_seconds():
...     while True:
...         print('Hello World')
...         yield from asyncio.sleep(2)
... 
>>> def loop_in_thread(loop):
...     asyncio.set_event_loop(loop)
...     loop.run_until_complete(greet_every_two_seconds())
... 
>>> 
>>> loop = asyncio.get_event_loop()
>>> import threading
>>> t = threading.Thread(target=loop_in_thread, args=(loop,))
>>> t.start()
Hello World
>>> 
>>> Hello World

请注意,您必须调用asyncio.set_event_looploop否则您将收到一条错误消息,指出当前线程没有事件循环。

如果你想从主线程与事件循环交互,你需要坚持loop.call_soon_threadsafe调用。

虽然这种事情是在解释器中进行试验的好方法,但在实际程序中,您可能希望所有代码都在事件循环中运行,而不是引入线程。

于 2014-10-09T05:13:02.637 回答
30

在 Python 3.8 中,您可以使用新的 asyncio REPL。

$ python -m asyncio
>>> async def greet_every_two_seconds():
...     while True:
...         print('Hello World')
...         await asyncio.sleep(2)
...
>>> # run in main thread (Ctrl+C to cancel)
>>> await greet_every_two_seconds()
...
>>> # run in background
>>> asyncio.create_task(greet_every_two_seconds())
于 2019-05-29T08:45:55.110 回答