我正在尝试优化评估相对昂贵的功能。该函数在一系列数据点上运行,并且可以并行评估。每个数据点评估都需要访问全局数据,因此我使用 ctype 和多处理数组在进程之间共享数据。我正在使用多处理 Pool.map() 并行评估整个数据集。
我正在使用 scikit-optimize 贝叶斯高斯过程最小化器来优化基于输入的函数。该输入对应于创建预定大小的数据集。
我有一个奇怪的问题,如果我运行 GP 最小化程序进行 10 次以上的调用,但只有当我计算函数的并行部分的数据集的大小很大时,程序才会挂起。否则,我可以毫无问题地运行最多 100 次调用的优化器。
基本的脚手架是这样的:
'''
from multiprocessing import Pool, Array
import ctypes
def func(x):
size = 1000
dataset = np.ctypeslib.as_array(Array(ctypes.c_double, size).get_obj())
# I know this looks weird, but is actually an array input
x_shared = np.ctypeslib.asarray(Array(ctypes.c_double, 1).get_obj())
# create the data based on the input value
pool = Pool()
pool.map(create_data, range(size))
pool.close()
pool.join()
# evaluate the data
pool = Pool()
pool.map(evaluate_data, range(size))
pool.join()
pool.close()
return np.mean(dataset)
if __name__ == '__main__':
gp_minimize(func, ncalls = 10)
我意识到上面的代码不一定对双池有意义,但对于我的实际程序来说它是必要的,因为它太大而无法发布。
当我中断被阻止的程序时,我收到以下错误:
'''
File "ig_func.py", line 260, in opt_func
pool2.map(compute_ig, range(nlpts))
File "/opt/anaconda3/envs/obspy/lib/python3.7/multiprocessing/pool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/opt/anaconda3/envs/obspy/lib/python3.7/multiprocessing/pool.py", line 651, in get
self.wait(timeout)
File "/opt/anaconda3/envs/obspy/lib/python3.7/multiprocessing/pool.py", line 648, in wait
self._event.wait(timeout)
File "/opt/anaconda3/envs/obspy/lib/python3.7/threading.py", line 552, in wait
signaled = self._cond.wait(timeout)
File "/opt/anaconda3/envs/obspy/lib/python3.7/threading.py", line 296, in wait
waiter.acquire()
'''
当地图等待获取资源时,程序似乎卡住了。同样,当我正在操作的数据集的大小很小时,它不会引起这个问题。有任何想法吗?