我在单独的进程中运行大量 CPU 密集型计算,并希望在每个作业完成时执行一些回调。为此,我将asyncio
与 a结合使用,并在每个viaconcurrent.futures.ProcessPoolExecutor
上设置回调:Future
add_done_callback
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import time
import random
def run_job(job):
length = random.randrange(1, 10)
time.sleep(length)
return f"slept for {length} s"
def on_complete(future, job):
print(f"Finished computing job {job}: {future.result()}")
async def run_jobs(executor, jobs):
"""Run computations in separate processes and update results asynchronously."""
loop = asyncio.get_event_loop()
tasks = []
print(f"Scheduling {len(jobs)} jobs")
for job in jobs:
future = loop.run_in_executor(
executor,
run_job,
job,
)
future.add_done_callback(
partial(on_complete, job=job)
)
tasks.append(future)
await asyncio.wait(tasks)
if __name__ == "__main__":
jobs = [str(jobnum) for jobnum in range(1, 10)]
with ProcessPoolExecutor() as executor:
asyncio.run(run_jobs(executor, jobs))
这很好用,除非计算被中断,例如,KeyboardInterrupt
然后stderr
会收到来自每个子进程的消息的垃圾邮件说RuntimeError: Event loop is closed
。这个问题的答案建议使用 Python 3.7+函数,因为它处理中断后的清理,但这似乎不适用于在单独进程中运行的作业。asyncio.run
RuntimeError
在 a 之后发出的其中一个 s 的回溯KeyboardInterrupt
指向可能的原因:
exception calling callback for <Future at 0x7fd4f679ea60 state=finished returned str>
Traceback (most recent call last):
File "/path/to/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
callback(self)
...other stuff...
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
我的猜测是,当asyncio.run
在处理中断时取消正在运行的任务时,回调在每个Future
单独的进程中被调用,但asyncio.run
在这些回调运行之前关闭事件循环,因为它没有被回调执行阻塞。
编辑:我在创建期货时注释掉了该add_done_callback
块,这不会改变结果 - 表明问题毕竟与回调无关,而是与此处看到的相同问题。