我发现 Haskell 的数组库 Repa 非常有趣,并想做一个简单的程序,尝试了解如何使用它。我还使用列表做了一个简单的实现,事实证明它要快得多。我的主要问题是如何改进下面的 Repa 代码以使其最有效(并且希望也非常易读)。我是使用 Haskell 的新手,我在 Repa 上找不到任何易于理解的教程 [编辑Haskell Wiki上有一个,我写这篇文章时不知何故忘记了],所以不要以为我什么都知道。:) 例如,我不确定何时使用 force 或 deepSeqArray。
该程序用于通过以下方式近似计算球体的体积:
- 指定了球体的中心点和半径,以及已知包含球体的长方体内的等距坐标。
- 该程序获取每个坐标,计算到球体中心的距离,如果它小于球体的半径,则用于将球体的总(近似)体积相加。
下面显示了两个版本,一个使用列表,一个使用 repa。我知道代码效率低下,尤其是对于这个用例,但我的想法是稍后让它变得更复杂。
对于以下值,并使用“ghc -Odph -fllvm -fforce-recomp -rtsopts -threaded”编译,列表版本需要 1.4 秒,而 repa 版本需要 12 秒 +RTS -N1 和 10 秒 +RTS - N2,虽然没有转换火花(我有一台运行 Windows 7 64、GHC 7.0.2 和 llvm 2.8 的双核 Intel 机器(Core 2 Duo E7400 @ 2.8 GHz))。(在下面的 main 中注释掉正确的行以运行其中一个版本。)
感谢您的任何帮助!
import Data.Array.Repa as R
import qualified Data.Vector.Unboxed as V
import Prelude as P
-- Calculate the volume of a sphere by putting it in a bath of coordinates. Generate coordinates (x,y,z) in a cuboid. Then, for each coordinate, check if it is inside the sphere. Sum those coordinates and multiply by the coordinate grid step size to find an approximate volume.
particles = [(0,0,0)] -- used for the list alternative --[(0,0,0),(0,2,0)]
particles_repa = [0,0,0::Double] -- used for the repa alternative, can currently just be one coordinate
-- Radius of the particle
a = 4
-- Generate the coordinates. Could this be done more efficiently, and at the same time simple? In Matlab I would use ndgrid.
step = 0.1 --0.05
xrange = [-10,-10+step..10] :: [Double]
yrange = [-10,-10+step..10]
zrange = [-10,-10+step..10]
-- All coordinates as triples. These are used directly in the list version below.
coords = [(x,y,z) | x <- xrange, y <- yrange, z <- zrange]
---- List code ----
volumeIndividuals = fromIntegral (length particles) * 4*pi*a**3/3
volumeInside = step**3 * fromIntegral (numberInsideParticles particles coords)
numberInsideParticles particles coords = length $ filter (==True) $ P.map (insideParticles particles) coords
insideParticles particles coord = any (==True) $ P.map (insideParticle coord) particles
insideParticle (xc,yc,zc) (xp,yp,zp) = ((xc-xp)^2+(yc-yp)^2+(zc-zp)^2) < a**2
---- End list code ----
---- Repa code ----
-- Put the coordinates in a Nx3 array.
xcoords = P.map (\(x,_,_) -> x) coords
ycoords = P.map (\(_,y,_) -> y) coords
zcoords = P.map (\(_,_,z) -> z) coords
-- Total number of coordinates
num_coords = (length xcoords) ::Int
xcoords_r = fromList (Z :. num_coords :. (1::Int)) xcoords
ycoords_r = fromList (Z :. num_coords :. (1::Int)) ycoords
zcoords_r = fromList (Z :. num_coords :. (1::Int)) zcoords
rcoords = xcoords_r R.++ ycoords_r R.++ zcoords_r
-- Put the particle coordinates in an array, then extend (replicate) this array so that its size becomes the same as that of rcoords
particle = fromList (Z :. (1::Int) :. (3::Int)) particles_repa
particle_slice = slice particle (Z :. (0::Int) :. All)
particle_extended = extend (Z :. num_coords :. All) particle_slice
-- Calculate the squared difference between the (x,y,z) coordinates of the particle and the coordinates of the cuboid.
squared_diff = deepSeqArrays [rcoords,particle_extended] ((force2 rcoords) -^ (force2 particle_extended)) **^ 2
(**^) arr pow = R.map (**pow) arr
xslice = slice squared_diff (Z :. All :. (0::Int))
yslice = slice squared_diff (Z :. All :. (1::Int))
zslice = slice squared_diff (Z :. All :. (2::Int))
-- Calculate the distance between each coordinate and the particle center
sum_squared_diff = [xslice,yslice,zslice] `deepSeqArrays` xslice +^ yslice +^ zslice
-- Do the rest using vector, since I didn't get the repa variant working.
ssd_vec = toVector sum_squared_diff
-- Determine the number of the coordinates that are within the particle (instead of taking the square root to get the distances above, I compare to the square of the radius here, to improve performance)
total_within = fromIntegral (V.length $ V.filter (<a**2) ssd_vec)
--total_within = foldAll (\x acc -> if x < a**2 then acc+1 else acc) 0 sum_squared_diff
-- Finally, calculate an approximation of the volume of the sphere by taking the volume of the cubes with side step, multiplied with the number of coordinates within the sphere.
volumeInside_repa = step**3 * total_within
-- Helper function that shows the size of a 2-D array.
rsize = reverse . listOfShape . (extent :: Array DIM2 Double -> DIM2)
---- End repa code ----
-- Comment out the list or the repa version if you want to time the calculations separately.
main = do
putStrLn $ "Step = " P.++ show step
putStrLn $ "Volume of individual particles = " P.++ show volumeIndividuals
putStrLn $ "Volume of cubes inside particles (list) = " P.++ show volumeInside
putStrLn $ "Volume of cubes inside particles (repa) = " P.++ show volumeInside_repa
编辑:一些背景解释了为什么我写了上面的代码:
我主要在 Matlab 中编写代码,我的性能改进经验主要来自该领域。在 Matlab 中,您通常希望使用直接在矩阵上操作的函数进行计算,以提高性能。我在 Matlab R2010b 中实现上述问题,使用下面显示的矩阵版本需要 0.9 秒,使用嵌套循环需要 15 秒。虽然我知道 Haskell 与 Matlab 非常不同,但我希望在 Haskell 中从使用列表到使用 Repa 数组会提高代码的性能。列表-> Repa 数组-> 向量的转换在那里,因为我不够熟练,无法用更好的东西替换它们。这就是我要求输入的原因。:) 上面的时间数字因此是主观的,因为它可以衡量我的性能超过语言的能力,但它现在对我来说是一个有效的指标,因为决定我将使用什么取决于我是否可以使它工作。
tl;博士:我知道我上面的 Repa 代码可能是愚蠢或病态的,但这是我现在能做的最好的。我希望能够编写更好的 Haskell 代码,我希望你能在这个方向上帮助我(dons 已经这样做了)。:)
function archimedes_simple()
particles = [0 0 0]';
a = 4;
step = 0.1;
xrange = [-10:step:10];
yrange = [-10:step:10];
zrange = [-10:step:10];
[X,Y,Z] = ndgrid(xrange,yrange,zrange);
dists2 = bsxfun(@minus,X,particles(1)).^2+ ...
bsxfun(@minus,Y,particles(2)).^2+ ...
bsxfun(@minus,Z,particles(3)).^2;
inside = dists2 < a^2;
num_inside = sum(inside(:));
disp('');
disp(['Step = ' num2str(step)]);
disp(['Volume of individual particles = ' num2str(size(particles,2)*4*pi*a^3/3)]);
disp(['Volume of cubes inside particles = ' num2str(step^3*num_inside)]);
end
编辑 2:Repa 代码的新的、更快和更简单的版本
我现在已经阅读了更多关于 Repa 的内容,并思考了一下。下面是一个新的 Repa 版本。在这种情况下,我使用 Repa 扩展函数从值列表(类似于 ndgrid 在 Matlab 中的工作方式)将 x、y 和 z 坐标创建为 3-D 数组。然后我映射这些阵列以计算到球形粒子的距离。最后,我折叠生成的 3-D 距离数组,计算球内有多少个坐标,然后将其乘以一个常数因子以获得近似体积。我的算法实现现在更类似于上面的 Matlab 版本,不再有任何转换为向量。
新版本在我的电脑上运行大约 5 秒,比上面有相当大的改进。如果我在编译时使用“线程”,是否与“+RTS -N2”结合使用,时间是相同的,但是线程版本确实最大限度地利用了我计算机的两个内核。然而,我确实看到几滴“-N2”运行到 3.1 秒,但后来无法重现它们。也许它对同时运行的其他进程非常敏感?基准测试时我已经关闭了计算机上的大多数程序,但仍有一些程序在运行,例如后台进程。
如果我们使用“-N2”并添加运行时开关以关闭并行 GC(-qg),时间会持续下降到 ~4.1 秒,并使用 -qa 来“使用操作系统设置线程亲和性(实验性)”,时间缩短到约 3.5 秒。查看使用“+RTS -s”运行程序的输出,使用 -qg 执行的 GC 少得多。
今天下午我会看看我是否可以在8核计算机上运行代码,只是为了好玩。:)
import Data.Array.Repa as R
import Prelude as P
import qualified Data.List as L
-- Calculate the volume of a spherical particle by putting it in a bath of coordinates. Generate coordinates (x,y,z) in a cuboid. Then, for each coordinate, check if it is inside the sphere. Sum those coordinates and multiply by the coordinate grid step size to find an approximate volume.
particles :: [(Double,Double,Double)]
particles = [(0,0,0)]
-- Radius of the spherical particle
a = 4
volume_individuals = fromIntegral (length particles) * 4*pi*a^3/3
-- Generate the coordinates.
step = 0.1
coords_list = [-10,-10+step..10] :: [Double]
num_coords = (length coords_list) :: Int
coords :: Array DIM1 Double
coords = fromList (Z :. (num_coords ::Int)) coords_list
coords_slice :: Array DIM1 Double
coords_slice = slice coords (Z :. All)
-- x, y and z are 3-D arrays, where the same index into each array can be used to find a single coordinate, e.g. (x(i,j,k),y(i,j,k),z(i,j,k)).
x,y,z :: Array DIM3 Double
x = extend (Z :. All :. num_coords :. num_coords) coords_slice
y = extend (Z :. num_coords :. All :. num_coords) coords_slice
z = extend (Z :. num_coords :. num_coords :. All) coords_slice
-- Calculate the squared distance from each coordinate to the center of the spherical particle.
dist2 :: (Double, Double, Double) -> Array DIM3 Double
dist2 particle = ((R.map (squared_diff xp) x) + (R.map (squared_diff yp) y) + (R.map ( squared_diff zp) z))
where
(xp,yp,zp) = particle
squared_diff xi xa = (xa-xi)^2
-- Count how many of the coordinates are within the spherical particle.
num_inside_particle :: (Double,Double,Double) -> Double
num_inside_particle particle = foldAll (\acc x -> if x<a^2 then acc+1 else acc) 0 (force $ dist2 particle)
-- Calculate the approximate volume covered by the spherical particle.
volume_inside :: [Double]
volume_inside = P.map ((*step^3) . num_inside_particle) particles
main = do
putStrLn $ "Step = " P.++ show step
putStrLn $ "Volume of individual particles = " P.++ show volume_individuals
putStrLn $ "Volume of cubes inside each particle (repa) = " P.++ (P.concat . ( L.intersperse ", ") . P.map show) volume_inside
-- As an alternative, y and z could be generated from x, but this was slightly slower in my tests (~0.4 s).
--y = permute_dims_3D x
--z = permute_dims_3D y
-- Permute the dimensions in a 3-D array, (forward, cyclically)
permute_dims_3D a = backpermute (swap e) swap a
where
e = extent a
swap (Z :. i:. j :. k) = Z :. k :. i :. j
新代码的空间分析
下面与 Don Stewart 制作的配置文件类型相同,但使用的是新的 Repa 代码。