我正在做一个项目,我打算通过使用joblib
带有共享内存的并行函数来提高效率。
但是,我还打算通过使用不同的参数(即没有共享内存)多次运行进程来对程序进行参数化研究。
我想知道这在 Python/ 中是否可行joblib
。
编辑:2020-06-19
正如另一位用户提到的,我应该澄清我想要并行化的代码中的内容。本质上,我有一个代表一些物理空间的 3D numpy 数组,我用大量截断的高斯函数填充该数组(仅影响有限数量的元素)。由于瓶颈是内存访问,没有发现完全矢量化可以特别加快代码速度,我想尝试并行化,因为我遍历所有ith
- 高斯中心并将其贡献添加到整个领域。(这些循环将在一定程度上共享变量)
并行代码中并行代码的想法是,我还希望使用在线访问的集群同时运行大量此类进程,以便对项目的整体性能进行参数研究未指定的指标。因此,这些循环将是完全独立的。
此处发布了内部循环的修改摘录。不幸的是,它似乎并没有提高性能,而且在我没有将高斯中心列表拆分为每个核心的两个数组的情况下,情况更糟,我目前正在对此进行调查。
import numpy as np
import time
from joblib import Parallel, delayed, parallel_backend
from extra_fns import *
time.perf_counter()
nj = 2
set_par = True
split_var = True
# define 3d grid
nd = 3
nx = 250
ny = 250
nz = 250
x = np.linspace(0, 1, nx)
y = np.linspace(0, 1, ny)
z = np.linspace(0, 1, nz)
# positions of gaussians in space
pgrid = np.linspace(0.05, 0.95 , 20)
Xp, Yp, Zp = np.meshgrid(pgrid,pgrid,pgrid)
xp = Xp.ravel()
yp = Yp.ravel()
zp = Zp.ravel()
Np = np.size(xp)
s = np.ones(Np) # intensity of each gaussian
# compact gaussian representation
sigma = x[1]-x[0]
max_dist = sigma*(-2*np.log(10e-3))
# 3D domain:
I = np.zeros((ny, nx, nz))
dx = x[1] - x[0]
dy = y[1] - y[0]
dz = z[1] - z[0]
dix = np.ceil(max_dist/dx)
diy = np.ceil(max_dist/dy)
diz = np.ceil(max_dist/dz)
def run_test(set_par, split_var, xp, yp, zp, s):
def add_loc_gaussian(i):
ix = round((xp[i] - x[0]) / dx)
iy = round((yp[i] - y[0]) / dy)
iz = round((zp[i] - z[0]) / dz)
iix = np.arange(max(0, ix - dix), min(nx, ix + dix), 1, dtype=int)
iiy = np.arange(max(0, iy - diy), min(ny, iy + diy), 1, dtype=int)
iiz = np.arange(max(0, iz - diz), min(nz, iz + diz), 1, dtype=int)
ddx = dx * iix - xp[i]
ddy = dy * iiy - yp[i]
ddz = dz * iiz - zp[i]
gx = np.exp(-1 / (2 * sigma ** 2) * ddx ** 2)
gy = np.exp(-1 / (2 * sigma ** 2) * ddy ** 2)
gz = np.exp(-1 / (2 * sigma ** 2) * ddz ** 2)
gx = gx[np.newaxis,:, np.newaxis]
gy = gy[:,np.newaxis, np.newaxis]
gz = gz[np.newaxis, np.newaxis, :]
I[np.ix_(iiy, iix, iiz)] += s[i] * gy*gx*gz
if set_par and split_var: # case 1
mp = int(Np/nj) # hard code this test fn for two cores
xp_list = [xp[:mp],xp[mp:]]
yp_list = [yp[:mp],yp[mp:]]
zp_list = [zp[:mp],zp[mp:]]
sp_list = [s[:mp],s[mp:]]
def core_loop(j):
xpt = xp_list[j]
ypt = yp_list[j]
zpt = zp_list[j]
spt = sp_list[j]
def add_loc_gaussian_s(i):
ix = round((xpt[i] - x[0]) / dx)
iy = round((ypt[i] - y[0]) / dy)
iz = round((zpt[i] - z[0]) / dz)
iix = np.arange(max(0, ix - dix), min(nx, ix + dix), 1, dtype=int)
iiy = np.arange(max(0, iy - diy), min(ny, iy + diy), 1, dtype=int)
iiz = np.arange(max(0, iz - diz), min(nz, iz + diz), 1, dtype=int)
ddx = dx * iix - xpt[i]
ddy = dy * iiy - ypt[i]
ddz = dz * iiz - zpt[i]
gx = np.exp(-1 / (2 * sigma ** 2) * ddx ** 2)
gy = np.exp(-1 / (2 * sigma ** 2) * ddy ** 2)
gz = np.exp(-1 / (2 * sigma ** 2) * ddz ** 2)
gx = gx[np.newaxis, :, np.newaxis]
gy = gy[:, np.newaxis, np.newaxis]
gz = gz[np.newaxis, np.newaxis, :]
I[np.ix_(iiy, iix, iiz)] += spt[i] * gy * gx * gz
for i in range(np.size(xpt)):
add_loc_gaussian_s(i)
Parallel(n_jobs=2, require='sharedmem')(delayed(core_loop)(i) for i in range(2))
elif set_par: # case 2
Parallel(n_jobs=nj, require='sharedmem')(delayed(add_loc_gaussian)(i) for i in range(Np))
else: # case 3
for i in range(0,Np):
add_loc_gaussian(i)
run_test(set_par, split_var, xp, yp, zp, s)
print("Time taken: {} s".format(time.perf_counter()))