1

我正在尝试优化评估相对昂贵的功能。该函数在一系列数据点上运行,并且可以并行评估。每个数据点评估都需要访问全局数据,因此我使用 ctype 和多处理数组在进程之间共享数据。我正在使用多处理 Pool.map() 并行评估整个数据集。

我正在使用 scikit-optimize 贝叶斯高斯过程最小化器来优化基于输入的函数。该输入对应于创建预定大小的数据集。

我有一个奇怪的问题,如果我运行 GP 最小化程序进行 10 次以上的调用,但只有当我计算函数的并行部分的数据集的大小很大时,程序才会挂起。否则,我可以毫无问题地运行最多 100 次调用的优化器。

基本的脚手架是这样的:

'''

from multiprocessing import Pool, Array
import ctypes

def func(x):

    size = 1000
    dataset = np.ctypeslib.as_array(Array(ctypes.c_double, size).get_obj())

    # I know this looks weird, but is actually an array input
    x_shared = np.ctypeslib.asarray(Array(ctypes.c_double, 1).get_obj())

    # create the data based on the input value
    pool = Pool()
    pool.map(create_data, range(size))
    pool.close()
    pool.join()

    # evaluate the data 
    pool = Pool()
    pool.map(evaluate_data, range(size))
    pool.join()
    pool.close()

    return np.mean(dataset)

if __name__ == '__main__':
    gp_minimize(func, ncalls = 10)

我意识到上面的代码不一定对双池有意义,但对于我的实际程序来说它是必要的,因为它太大而无法发布。

当我中断被阻止的程序时,我收到以下错误:

'''

File "ig_func.py", line 260, in opt_func
    pool2.map(compute_ig, range(nlpts))
  File "/opt/anaconda3/envs/obspy/lib/python3.7/multiprocessing/pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/opt/anaconda3/envs/obspy/lib/python3.7/multiprocessing/pool.py", line 651, in get
    self.wait(timeout)
  File "/opt/anaconda3/envs/obspy/lib/python3.7/multiprocessing/pool.py", line 648, in wait
    self._event.wait(timeout)
  File "/opt/anaconda3/envs/obspy/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/opt/anaconda3/envs/obspy/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()

'''

当地图等待获取资源时,程序似乎卡住了。同样,当我正在操作的数据集的大小很小时,它不会引起这个问题。有任何想法吗?

4

0 回答 0