0

我想将 ProcessPoolExecutor 与 asyncio 结合起来在 TestClass 中同时运行我的阻塞函数。每个任务都旨在长时间运行,因此我需要一个有效的关闭过程以在退出脚本后使事情顺利进行。我需要在哪里为 KeyboardInterrupt 添加错误处理以顺利关闭所有任务和进程?我搜索了很多相关的主题,但没有一个能解决我想要的问题。希望能得到一些帮助!提前致谢。

import asyncio
from concurrent.futures import ProcessPoolExecutor


class TestClass:
    def __init__(self) -> None:
        self.value1 = 1
        self.value2 = 2


async def task(loop,executor_processes, i):
    print(f"[TASK {i}] Initializing Abck class")
    new_test = await loop.run_in_executor(executor_processes,TestClass)
    # other async and sync functions contained in TestClass
    print(f"[TASK {i}] Finished")


async def main():
    executor_processes = ProcessPoolExecutor(max_workers=5)

    loop_ = asyncio.get_event_loop()
    tasks = []

    for i in range(1, 100):
        tasks.append(task(loop_, executor_processes, i))

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("ctrl + c")
    finally:
        print('Program finished')

以下是在所有任务和进程完成之前按 ctrl + c 后的错误日志。

Fatal Python error: Fatal Python error: init_import_sizeinit_import_size: : Failed to import the site moduleFailed to import the site module

Python runtime state: Python runtime state: initializedinitialized

Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    main()
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    addsitepackages(known_paths, [sys.prefix])
    addsitedir(sitedir, known_paths)
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    addsitedir(sitedir, known_paths)
    exec(line)
    exec(line)
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 2, in <module>
    exec(line)
  File "<string>", line 1, in <module>
    from contextlib import contextmanager
    from . import abc
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 6, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/abc.py", line 4, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    from contextlib import contextmanager
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 165, in <module>
    exec(line)
  File "<string>", line 1, in <module>
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
    from contextlib import contextmanager
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 5, in <module>
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 160, in addpackage
    f = io.TextIOWrapper(io.open_code(fullname))
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 779, in exec_module
  File "<frozen importlib._bootstrap_external>", line 911, in get_code
  File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
KeyboardInterrupt
    from . import machinery
KeyboardInterrupt
    from functools import wraps
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 438, in <module>
    from collections import deque
    class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 21, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/abc.py", line 85, in __new__
    cls = super().__new__(mcls, name, bases, namespace, **kwargs)
KeyboardInterrupt
    from operator import itemgetter as _itemgetter, eq as _eq
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 779, in exec_module
    _CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 394, in namedtuple
  File "<frozen importlib._bootstrap_external>", line 911, in get_code
  File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
Exception in thread QueueManagerThread:
Traceback (most recent call last):
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    exec(s, namespace)
  File "<string>", line 1, in <module>
ctrl + c
Program finished
KeyboardInterrupt
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 394, in _queue_management_worker
    work_item.future.set_exception(bpe)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7ffed1f2f250 state=cancelled>
4

1 回答 1

3

视窗解决方案

如果您在 Windows 上运行,那么 CTRL-C 中断处理似乎不适用于多处理池。以下内容有点笨拙,但似乎很流行。

ctrl_c_entered这个想法是用一个初始设置为的全局变量来初始化多处理池中的每个进程False。我已经TestClass用一个方法完成了你的课程,该方法foo将是调用的工作函数。调用时必须:

  1. 测试全局标志ctrl_c_entered,如果为真,则立即返回。
  2. 有自己的 KeyboardInterrupt 处理程序,在这样的中断上,它必须将全局ctrl_c_entered标志设置为 True 并返回。
  3. 更新:但是,当池进程尚未将控制权转移到工作函数时,可以输入 CTRL-C。例如,它可能正在从输入队列中获取下一个要运行的任务。在这种情况下,否则不会有try/catch有效的 KeyboardInterrupt 异常。因此,我们需要为池中将ctrl_c_entered标志设置为 的每个进程的 SIGINT 中断设置一个中断处理程序True。但这现在意味着必须在上面的步骤 2 中临时恢复原始的默认SIGINT中断处理程序,以便捕获 KeyboardInterrupt 异常。

您还必须让所有提交的异步任务完成。所以我们设置了一个signal.SIGINT中断处理程序,它ctrl_c_entered为主进程设置一个全局标志,True如果输入了 CTRL-C(我们不会跳出asyncio.run(main()语句。我们长时间运行的 asyncio 任务必须检查这个ctrl_c_entered标志并在它设置为时终止真的。

import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper


class TestClass:
    def __init__(self) -> None:
        self.value1 = 1
        self.value2 = 2

    @handle_ctrl_c
    def foo(self, i):
        time.sleep(1)
        return i ** 2

async def task(loop, executor_processes, i):
    # If this is a long-running task, periodically check running flag and return if set.
    # For example:
    if ctrl_c_entered:
        return KeyboardInterrupt()
    print(f"[TASK {i}] Initializing Abck class")
    new_test = await loop.run_in_executor(executor_processes, TestClass().foo, i)
    # other async and sync functions contained in TestClass
    print(f"[TASK {i}] Finished")
    return new_test

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

async def main():
    executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)

    loop_ = asyncio.get_event_loop()
    tasks = []

    for i in range(1, 100):
        tasks.append(task(loop_, executor_processes, i))

    results = await asyncio.gather(*tasks)
    print(results)

def ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

if __name__ == '__main__':
    ctrl_c_entered = False
    signal.signal(signal.SIGINT, ctrl_c_handler)
    asyncio.run(main())
    print('Program finished')

印刷:

[TASK 1] Initializing Abck class
[TASK 2] Initializing Abck class
[TASK 3] Initializing Abck class
[TASK 4] Initializing Abck class
[TASK 5] Initializing Abck class
[TASK 6] Initializing Abck class
[TASK 7] Initializing Abck class
[TASK 8] Initializing Abck class
[TASK 9] Initializing Abck class
[TASK 10] Initializing Abck class
[TASK 11] Initializing Abck class
[TASK 12] Initializing Abck class
[TASK 13] Initializing Abck class
[TASK 14] Initializing Abck class
[TASK 15] Initializing Abck class
[TASK 16] Initializing Abck class
[TASK 17] Initializing Abck class
[TASK 18] Initializing Abck class
[TASK 19] Initializing Abck class
[TASK 1] Finished
[TASK 2] Finished
[TASK 3] Finished
[TASK 4] Finished
[TASK 5] Finished
[TASK 6] Finished
[TASK 7] Finished
[TASK 9] Finished
[TASK 8] Finished
[TASK 10] Finished
ctrl + c
ctrl + c
ctrl + c
ctrl + c
ctrl + c
[TASK 13] Finished
[TASK 16] Finished
[TASK 17] Finished
[TASK 18] Finished
[TASK 19] Finished
[TASK 14] Finished
[TASK 12] Finished
[TASK 11] Finished
[TASK 15] Finished
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt()]

使用 Fork 解决方案的 Linux 和平台

这更简单,因为中断处理或多或少适用于多处理池。处理这个问题的最简单方法是再次running为每个池进程初始化一个全局标志,工作函数可以定期检查并终止 if False。每个池进程将设置一个 CTRL-C 处理程序,并running在用户输入 CTRL-C 时设置为 False。这将负责终止任何已经运行的任务。主进程可以简单地处理KeyboardInterrupt异常:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time

class TestClass:
    def __init__(self) -> None:
        self.value1 = 1
        self.value2 = 2

    def foo(self):
        for _ in range(20):
            if not running:
                return
            time.sleep(.1)

async def task(loop, executor_processes, i):
    print(f"[TASK {i}] Initializing Abck class")
    new_test = await loop.run_in_executor(executor_processes, TestClass().foo)
    # other async and sync functions contained in TestClass
    print(f"[TASK {i}] Finished")

def ctrl_c_handler(*args, **kwargs):
    global running
    running = False

def init_pool():
    global running
    running = True
    signal.signal(signal.SIGINT, ctrl_c_handler)

async def main():
    executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)

    loop_ = asyncio.get_event_loop()
    tasks = []

    for i in range(1, 100):
        tasks.append(task(loop_, executor_processes, i))

    await asyncio.gather(*tasks)

try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("ctrl + c")
print('Program finished')
于 2021-08-29T12:32:52.823 回答