6

我有一个关于如何尽可能快地计算 numpy 距离的问题,

def getR1(VVm,VVs,HHm,HHs):
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    R1*=R1
    R+=R1
    del R1
    print "R1\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    # uses 17.5Gb ram
    return R


def getR2(VVm,VVs,HHm,HHs):
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = numpy.einsum('ijk,ijk->ij', deltas, deltas)
    print "R2\t",time.time()-t0,R.shape, #14.5291359425 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 26Gb ram
    return R


def getR3(VVm,VVs,HHm,HHs):
    from numpy.core.umath_tests import inner1d
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = inner1d(deltas, deltas)
    print "R3\t",time.time()-t0, R.shape, #12.6972110271 (108225, 10500)
    print numpy.max(R) #4176.26290975
    #Uses 26Gb
    return R


def getR4(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'sqeuclidean') #.T
    print "R4\t",time.time()-t0, R.shape, #17.7022118568 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 9 Gb ram
    return R

def getR5(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'euclidean') #.T
    print "R5\t",time.time()-t0, R.shape, #15.6070930958 (108225, 10500)
    print numpy.max(R) #64.6240118667
    # uses only 9 Gb ram
    return R

def getR6(VVm,VVs,HHm,HHs):
    from scipy.weave import blitz
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    blitz("R=R*R") # R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    blitz("R1=R1*R1") # R1*=R1
    blitz("R=R+R1") # R+=R1
    del R1
    print "R6\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    return R

导致以下时间:

R1  11.7737319469 (108225, 10500) 4909.66881791
R2  15.1279799938 (108225, 10500) 4909.66881791
R3  12.7408981323 (108225, 10500) 4909.66881791
R4  17.3336868286 (10500, 108225) 4909.66881791
R5  15.7530870438 (10500, 108225) 70.0690289494
R6  11.670968771 (108225, 10500) 4909.66881791

虽然最后一个给出 sqrt((VVm-VVs)^2+(HHm-HHs)^2),而其他给出 (VVm-VVs)^2+(HHm-HHs)^2,这并不重要,因为否则在我的代码中,我为每个 i 取 R[i,:] 的最小值,并且 sqrt 无论如何都不会影响最小值,(如果我对距离感兴趣,我只取 sqrt(value),而不是在整个阵列上执行 sqrt,因此实际上没有时间差异。

问题仍然存在:为什么第一个解决方案是最好的,(第二个和第三个较慢的原因是因为 deltas=... 需要 5.8 秒,(这也是这两种方法需要 26Gb 的原因)),为什么skeuclidean 比 euclidean 慢?

sqeuclidean 应该只做 (VVm-VVs)^2+(HHm-HHs)^2,而我认为它做了一些不同的事情。任何人都知道如何找到该方法的源代码(C 或底部的任何内容)?我认为它确实 sqrt((VVm-VVs)^2+(HHm-HHs)^2)^2 (我能想到它为什么会比 (VVm-VVs)^2+(HHm-HHs) 慢的唯一原因) ^2 - 我知道这是一个愚蠢的理由,有人有更合乎逻辑的理由吗?)

由于我对 C 一无所知,我将如何将其与 scipy.weave 内联?并且该代码是否可以像使用 python 一样正常编译?还是我需要为此安装特殊的东西?

编辑:好的,我用 scipy.weave.blitz(R6 方法)进行了尝试,这稍微快了一点,但我认为比我了解更多 C 的人仍然可以提高这个速度?我只是取了格式为 a+=b 或 *= 的行,并查看了它们在 C 中的情况,并将它们放在 blitz 语句中,但我想如果我将带有 flatten 和 newaxis 的语句放入C 也是如此,它也应该更快,但我不知道我该怎么做(知道 C 的人可能会解释吗?)。现在,闪电战的东西和我的第一种方法之间的差异还不足以真正由 C 和 numpy 引起,我猜?

我想像 deltas=... 这样的其他方法也可以更快,当我将它放入 C 时?

4

1 回答 1

7

每当您有乘法和求和时,请尝试使用其中一个点积函数或np.einsum. 由于您正在预先分配数组,而不是为水平和垂直坐标设置不同的数组,因此将它们堆叠在一起:

precomputed_flat = np.column_stack((svf.flatten(), shf.flatten()))
measured_flat = np.column_stack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat - measured_flat[:, None, :]

从这里开始,最简单的是:

dist = np.einsum('ijk,ijk->ij', deltas, deltas)

您也可以尝试以下方法:

from numpy.core.umath_tests import inner1d
dist = inner1d(deltas, deltas)

当然还有 SciPy 的空间模块cdist

from scipy.spatial.distance import cdist
dist = cdist(precomputed_flat, measured_flat, 'euclidean')

编辑 我不能在这么大的数据集上运行测试,但这些时间是相当有启发性的:

len_a, len_b = 10000, 1000

a = np.random.rand(2, len_a)
b =  np.random.rand(2, len_b)
c = np.random.rand(len_a, 2)
d = np.random.rand(len_b, 2)

In [3]: %timeit a[:, None, :] - b[..., None]
10 loops, best of 3: 76.7 ms per loop

In [4]: %timeit c[:, None, :] - d
1 loops, best of 3: 221 ms per loop

对于上述较小的数据集,我可以通过在内存中以不同方式排列数据来稍微加快您的方法的速度scipy.spatial.distance.cdist并将其与 匹配inner1d

precomputed_flat = np.vstack((svf.flatten(), shf.flatten()))
measured_flat = np.vstack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat[:, None, :] - measured_flat

import scipy.spatial.distance as spdist
from numpy.core.umath_tests import inner1d

In [13]: %timeit r0 = a[0, None, :] - b[0, :, None]; r1 = a[1, None, :] - b[1, :, None]; r0 *= r0; r1 *= r1; r0 += r1
10 loops, best of 3: 146 ms per loop

In [14]: %timeit deltas = (a[:, None, :] - b[..., None]).T; inner1d(deltas, deltas)
10 loops, best of 3: 145 ms per loop

In [15]: %timeit spdist.cdist(a.T, b.T)
10 loops, best of 3: 124 ms per loop

In [16]: %timeit deltas = a[:, None, :] - b[..., None]; np.einsum('ijk,ijk->jk', deltas, deltas)
10 loops, best of 3: 163 ms per loop
于 2013-07-08T14:05:52.537 回答