下面是我一直使用的一个简单实用程序。
########### Start of Utility Code ###########
import os
import sys
import traceback
from concurrent import futures
from functools import partial
def catch(fn):
def wrap(*args, **kwargs):
result = None
try:
result = fn(*args, **kwargs)
except Exception as err:
type_, value_, traceback_ = sys.exc_info()
return None, (
args,
"".join(traceback.format_exception(type_, value_, traceback_)),
)
else:
return result, (args, None)
return wrap
def top_level_wrap(fn, arg_tuple):
args, kwargs = arg_tuple
return fn(*args, *kwargs)
def create_processes(fn, values, handle_error, handle_success):
cores = os.cpu_count()
max_workers = 2 * cores + 1
to_exec = partial(top_level_wrap, fn)
with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
for result, error in executor.map(to_exec, values):
args, tb = error
if tb is not None:
handle_error(args, tb)
else:
handle_success(result)
########### End of Utility Code ###########
示例用法 -
######### Start of example usage ###########
import time
@catch
def fail_when_5(val):
time.sleep(val)
if val == 5:
raise Exception("Error - val was 5")
else:
return f"No error val is {val}"
def handle_error(args, tb):
print("args is", args)
print("TB is", tb)
def top_level(val, val_2, test=None, test2="ok"):
print(val_2, test, test2)
return fail_when_5(val)
handle_success = print
if __name__ == "__main__":
# SHAPE -> ( (args, kwargs), (args, kwargs), ... )
values = tuple(
((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)
)
create_processes(top_level, values, handle_error, handle_success)
######### End of example usage ###########