2

我在单独的进程中运行大量 CPU 密集型计算,并希望在每个作业完成时执行一些回调。为此,我将asyncio与 a结合使用,并在每个viaconcurrent.futures.ProcessPoolExecutor上设置回调:Futureadd_done_callback

import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import time
import random

def run_job(job):
    length = random.randrange(1, 10)
    time.sleep(length)
    return f"slept for {length} s"

def on_complete(future, job):
    print(f"Finished computing job {job}: {future.result()}")

async def run_jobs(executor, jobs):
    """Run computations in separate processes and update results asynchronously."""
    loop = asyncio.get_event_loop()
    tasks = []

    print(f"Scheduling {len(jobs)} jobs")
    for job in jobs:
        future = loop.run_in_executor(
            executor,
            run_job,
            job,
        )
        future.add_done_callback(
            partial(on_complete, job=job)
        )

        tasks.append(future)

    await asyncio.wait(tasks)

if __name__ == "__main__":
    jobs = [str(jobnum) for jobnum in range(1, 10)]

    with ProcessPoolExecutor() as executor:
        asyncio.run(run_jobs(executor, jobs))

这很好用,除非计算被中断,例如,KeyboardInterrupt然后stderr会收到来自每个子进程的消息的垃圾邮件说RuntimeError: Event loop is closed这个问题的答案建议使用 Python 3.7+函数,因为它处理中断后的清理,但这似乎不适用于在单独进程中运行的作业。asyncio.run

RuntimeError在 a 之后发出的其中一个 s 的回溯KeyboardInterrupt指向可能的原因:

exception calling callback for <Future at 0x7fd4f679ea60 state=finished returned str>
Traceback (most recent call last):
  File "/path/to/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
    callback(self)
  ...other stuff...
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

我的猜测是,当asyncio.run在处理中断时取消正在运行的任务时,回调在每个Future单独的进程中被调用,但asyncio.run在这些回调运行之前关闭事件循环,因为它没有被回调执行阻塞。

编辑:我在创建期货时注释掉了该add_done_callback块,这不会改变结果 - 表明问题毕竟与回调无关,而是与此处看到的相同问题。

4

0 回答 0