前言:是的,您的问题实际上有一个解决方案。请参阅底部的代码。但是,在我到达那里之前,我将进行一些解释。
我认为这里问题的根源是内存访问。首先,虽然我没有对它进行严格的研究,但我怀疑可以对 Julia 的底层代码进行适度数量的改进,以改进它在并行处理中处理内存访问的方式。尽管如此,在这种情况下,我怀疑基本代码的任何潜在问题(如果确实存在的话)并没有太大的错。相反,我认为仔细考虑代码中到底发生了什么以及它对内存访问意味着什么是有用的。
在 Julia 中工作时要记住的一个关键点是它以列优先顺序存储数组。也就是说,它将它们存储为彼此堆叠的列。这也推广到维度 > 2。看到这个Julia 性能提示中非常有用的部分,以获取更多信息。这意味着在单个列中逐行访问速度很快。但是,如果您需要在列中跳跃,那么您就会遇到麻烦。是的,访问 ram 内存可能相对较快,但访问缓存内存要快得多,因此,如果您的代码允许将单个列左右从 ram 加载到缓存中然后处理,那么您将做很多事情比需要在内存和缓存之间进行大量交换要好。在您的代码中,您在计算之间从一列切换到另一列,就像没人做生意一样。例如,在您的pmap
每个进程都会获得共享数组的不同列来处理。然后,每个人都沿着该列的行向下移动并修改其中的值。但是,由于它们试图彼此并行工作,并且整个阵列太大而无法放入您的缓存中,因此内存和缓存之间会发生大量交换,这确实会减慢您的速度。理论上,也许可以设计一个足够聪明的底层内存管理系统来解决这个问题,但我真的不知道——这超出了我的薪酬等级。当然,您对其他对象的访问也会发生同样的事情。
并行化时要记住的另一件事是触发器(即计算机计算)与读/写操作的比率。触发器倾向于很好地并行化,您可以让不同的内核、进程等对它们自己的小缓存中保存的数据位进行自己的小计算。但是,读/写操作不能很好地并行化。可以做一些事情来设计硬件系统来改进这一点。但一般来说,如果你有一个给定的计算机系统,比如说,两个内核,然后再添加四个内核,你执行触发器的能力将增加三倍,但你从 ram 读取/写入数据的能力不会t 真的提高很多。(注意:这过于简单化,很大程度上取决于您的系统)。然而,一般来说,你的触发器与读/写的比率越高,您可以从并行性中获得的好处越多。在您的情况下,您的代码涉及相当数量的读/写(所有这些对不同数组的访问),用于相对较少的触发器(一些乘法和加法)。这只是要记住的事情。
幸运的是,如果编写正确,您的案例可以从并行性中获得一些良好的加速。根据我对 Julia 的经验,我所有最成功的并行性都来自于我可以分解数据并让工作人员分别处理数据块。你的案子恰好符合这一点。下面是我编写的一些代码示例。您可以看到从一个处理器到三个处理器的速度几乎提高了 3 倍。代码在某些地方有点粗糙,但它至少展示了如何处理这样的事情的总体思路。之后我对代码发表了一些评论。
addprocs(3)
nx = 250;
nz = 250;
nh = 50;
nt = 250;
@everywhere ncol = 100;
model = rand(nz,nx,nh);
data = SharedArray(Float64,nt,ncol);
A = rand(nz,nx,nh);
B = rand(nz,nx,nh);
function distribute_data(X, obj_name_on_worker::Symbol, dim)
size_per_worker = floor(Int,size(X,1) / nworkers())
StartIdx = 1
EndIdx = size_per_worker
for (idx, pid) in enumerate(workers())
if idx == nworkers()
EndIdx = size(X,1)
end
println(StartIdx:EndIdx)
if dim == 3
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:,:])))
elseif dim == 2
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:])))
end
StartIdx = EndIdx + 1
EndIdx = EndIdx + size_per_worker - 1
end
end
distribute_data(model, :model, 3)
distribute_data(A, :A, 3)
distribute_data(B, :B, 3)
distribute_data(data, :data, 2)
@everywhere function Dummy(icol,model,data,A,B)
nx = size(model, 2)
nz = size(A,1)
nh = size(model, 3)
for ih = 1:nh
for ix = 1:nx
for iz = 1:nz
data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
end
end
end
end
regular_test() = map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
function parallel_test()
@everywhere begin
if myid() != 1
map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
end
end
end
@time regular_test(); # 2.120631 seconds (307 allocations: 11.313 KB)
@time parallel_test(); # 0.918850 seconds (5.70 k allocations: 337.250 KB)
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
function recombine_data(Data::Symbol)
Results = cell(nworkers())
for (idx, pid) in enumerate(workers())
Results[idx] = getfrom(pid, Data)
end
return vcat(Results...)
end
@time P_Data = recombine_data(:data); # 0.003132 seconds
P_Data == data ## true
注释
这里的使用SharedArray
是多余的。我只是使用它,因为它很容易就地修改,这就是您的代码最初编写的方式。这让我可以更直接地根据您编写的内容进行工作,而无需进行太多修改。
我没有在计时赛中包含将数据带回的步骤,但是正如您所看到的,在这种情况下,这只是一个微不足道的时间。在其他情况下,它可能不那么微不足道,但数据移动只是您面临的并行问题之一。
通常在进行计时测试时,最好的做法是运行一次函数(为了编译代码),然后再次运行以获取时间。这就是我在这里所做的。
请参阅此 SO帖子,了解我在此处使用的某些功能的灵感来源。