7

The actual problem I wish to solve is, given a set of N unit vectors and another set of M vectors calculate for each of the unit vectors the average of the absolute value of the dot product of it with every one of the M vectors. Essentially this is calculating the outer product of the two matrices and summing and averaging with an absolute value stuck in-between.

For N and M not too large this is not hard and there are many ways to proceed (see below). The problem is when N and M are large the temporaries created are huge and provide a practical limitation for the provided approach. Can this calculation be done without creating temporaries? The main difficulty I have is due to the presence of the absolute value. Are there general techniques for "threading" such calculations?

As an example consider the following code

N = 7
M = 5

# Create the unit vectors, just so we have some examples,
# this is not meant to be elegant
phi = np.random.rand(N)*2*np.pi
ctheta = np.random.rand(N)*2 - 1
stheta = np.sqrt(1-ctheta**2)
nhat = np.array([stheta*np.cos(phi), stheta*np.sin(phi), ctheta]).T

# Create the other vectors
m = np.random.rand(M,3)

# Calculate the quantity we desire, here using broadcasting.
S = np.average(np.abs(np.sum(nhat*m[:,np.newaxis,:], axis=-1)), axis=0)

This is great, S is now an array of length N and contains the desired results. Unfortunately in the process we have created some potentially huge arrays. The result of

np.sum(nhat*m[:,np.newaxis,:], axis=-1)

is a M X N array. The final result, of course, is only of size N. Start increasing the sizes of N and M and we quickly run into a memory error.

As noted above, if the absolute value were not required then we could proceed as follows, now using einsum()

T = np.einsum('ik,jk,j', nhat, m, np.ones(M)) / M

This works and works quickly even for quite large N and M . For the specific problem I need to include the abs() but a more general solution (perhaps a more general ufunc) would also be of interest.

4

3 回答 3

3

根据一些评论,似乎使用 cython 是最好的方法。我愚蠢地从未考虑过使用cython。事实证明,生成工作代码相对容易。

经过一番搜索,我整理了以下 cython 代码。这不是最通用的代码,可能不是最好的编写方式,并且可能会变得更有效率。即便如此,它只比einsum()原始问题中的代码慢 25%,所以还不错!它已被编写为与原始问题中创建的数组一起显式工作(因此假定输入数组的模式)。
尽管有一些警告,但它确实为原始问题提供了一个相当有效的解决方案,并且可以作为类似情况的起点。

import numpy as np
cimport numpy as np
import cython
DTYPE = np.float64
ctypedef np.float64_t DTYPE_t
cdef inline double d_abs (double a) : return a if a >= 0 else -a

@cython.boundscheck(False)
@cython.wraparound(False)
def process_vectors (np.ndarray[DTYPE_t, ndim=2, mode="fortran"] nhat not None,
                     np.ndarray[DTYPE_t, ndim=2, mode="c"] m not None) :
    if nhat.shape[1] != m.shape[1] :
        raise ValueError ("Arrays must contain vectors of the same dimension")
    cdef Py_ssize_t imax = nhat.shape[0]
    cdef Py_ssize_t jmax = m.shape[0]
    cdef Py_ssize_t kmax = nhat.shape[1] # same as m.shape[1]
    cdef np.ndarray[DTYPE_t, ndim=1] S = np.zeros(imax, dtype=DTYPE)
    cdef Py_ssize_t i, j, k
    cdef DTYPE_t val, tmp
    for i in range(imax) :
        val = 0
        for j in range(jmax) :
            tmp = 0
            for k in range(kmax) :
                tmp += nhat[i,k] * m[j,k]
            val += d_abs(tmp)
        S[i] = val / jmax
    return S
于 2013-07-14T03:02:43.537 回答
1

我认为没有任何简单的方法(除了 Cython 等)来加快您的精确操作。但是您可能需要考虑是否真的需要计算您正在计算的内容。因为如果你可以使用均方根而不是绝对值的平均值,你仍然会以某种方式平均内积的大小,但你可以一次性得到它:

rms = np.sqrt(np.einsum('ij,il,kj,kl,k->i', nhat, nhat, m, m, np.ones(M)/M))

这与执行以下操作相同:

rms_2 = np.sqrt(np.average(np.einsum('ij,kj->ik', nhat, m)**2, axis=-1))

是的,这并不完全符合您的要求,但恐怕它与您使用矢量化方法所获得的一样接近。如果你决定走这条路,看看np.einsumlarge Nand的性能有多好M:当传递太多参数和索引时,它有陷入困境的趋势。

于 2013-07-13T05:44:57.113 回答
0

这有点慢,但不会创建大的中间矩阵。

vals = np.zeros(N)
for i in xrange(N):
    u = nhat[i]
    for v in m:
        vals[i]+=abs(np.dot(u,v))
    vals[i]=vals[i]/M

编辑:将除以 M 移到 for 循环之外。

编辑2:新想法,保留旧想法以供后代和相关评论。

m2 = np.average(m,0)
vals = np.zeros(N)
for i in xrange(N):
    u=nhat[i]
    vals[i]=abs(np.dot(u,m2))

这很快,但有时会给出不同的值,我正在研究为什么但同时它可能会有所帮助。

编辑3:啊,这是绝对价值的东西。唔

>>> S
array([ 0.28620962,  0.65337876,  0.37470707,  0.46500913,  0.49579837,
        0.29348924,  0.27444208,  0.74586928,  0.35789315,  0.3079964 ,
        0.298353  ,  0.42571445,  0.32535728,  0.87505053,  0.25547394,
        0.23964505,  0.44773271,  0.25235646,  0.4722281 ,  0.33003338])
>>> vals
array([ 0.2099343 ,  0.6532155 ,  0.33039334,  0.45366889,  0.48921527,
        0.20467291,  0.16585856,  0.74586928,  0.31234917,  0.22198642,
        0.21013519,  0.41422894,  0.26020981,  0.87505053,  0.1199069 ,
        0.06542492,  0.44145805,  0.08455833,  0.46824704,  0.28483342])

time to compute S: 0.000342130661011 seconds
time to compute vals: 7.29560852051e-05 seconds

编辑 4:好吧,如果您的单位向量大部分为正值,这应该运行得更快,假设 m 中的向量总是正的,就像它们在您的虚拟数据中一样。

m2 = np.average(m,0)
vals = np.zeros(N)
for i in xrange(N):
    u=nhat[i]
    if u[0] >= 0 and u[1] >= 0 and u[2] >= 0:
        vals[i] = abs(np.dot(u,m2))
    else:
        for j in xrange(M):
            vals[i]+=abs(np.dot(u,m[j]))
        vals[i]/=M
于 2013-07-12T21:39:32.063 回答