我尝试了以下方法:
loop = asyncio.get_running_loop
task1 = asyncio.create_task(any_coroutine())
if task1 in asyncio.all_tasks(loop):
do something...
但它永远不会满足 if 条件。有人可以帮忙吗?
我尝试了以下方法:
loop = asyncio.get_running_loop
task1 = asyncio.create_task(any_coroutine())
if task1 in asyncio.all_tasks(loop):
do something...
但它永远不会满足 if 条件。有人可以帮忙吗?
根据文档,作为create_task调用,get_running_loop您不必获取事件循环 - 假设您已经在运行事件循环,因此get_running_loop()没有引发您的代码执行的以下错误:
RuntimeError: no running event loop
相反,尝试 newasync和await关键字,循环参数将在 python 3.10 上被弃用,所以如果你正在学习 asyncio,它会更好。
以下示例将正确满足if块中的条件。
import asyncio
async def my_coro():
await asyncio.sleep(5)
async def main():
task = asyncio.create_task(my_coro())
if task in asyncio.all_tasks():
print('Task found!')
else:
print('Missed!')
if __name__ == '__main__':
asyncio.run(main())