0

我有两个大向量(长度相等),我正在计算一个滑动窗口点积:

import numpy as np

a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66])

out = np.array(
    [[a[0]*b[0]+a[1]*b[1]+a[2]*b[2]],
     [a[1]*b[1]+a[2]*b[2]+a[3]*b[3]],
     [a[2]*b[2]+a[3]*b[3]+a[4]*b[4]],
     [a[3]*b[3]+a[4]*b[4]+a[5]*b[5]],
    ])

[[154]
 [319]
 [550]
 [847]]

当然,我可以调用点积函数,但如果窗口/向量长度很大,那么它的效率不如以下代码:

window = 3
result = np.empty([4,1])
result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
for i in range(3):
    result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]

[[154]
 [319]
 [550]
 [847]]

i+1th在这里,我们利用了点积与点积相似的事实ith。那是,

result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]

如何将我的 for 循环转换为矢量化函数,以便计算可以利用ith步骤中的信息,从而减少计算冗余,同时最大限度地减少所需的内存量。

更新

我实际上需要:

import numpy as np

a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66, 77, 88])

out = np.array(
    [a[0]*b[0]+a[1]*b[1]+a[2]*b[2]+a[3]*b[3]]+a[4]*b[4]]+a[5]*b[5],
     a[0]*b[1]+a[1]*b[2]+a[2]*b[3]+a[3]*b[4]]+a[4]*b[5]]+a[5]*b[6],
     a[0]*b[2]+a[1]*b[3]+a[2]*b[4]+a[3]*b[5]]+a[4]*b[6]]+a[5]*b[7],
    ])

[1001
 1232
 1463]

因此a将滑过b并计算点积。

4

3 回答 3

2

您可以对 O(n) 复杂度使用部分总和:

ps = np.r_[0, np.cumsum(a*b)]
ps[3:]-ps[:-3]
# array([154, 319, 550, 847])

或者更接近原始for循环并避免非常大的部分和的变体:

k = 3
d = a*b
d[k:] -= d[:-k].copy()
np.cumsum(d)[k-1:]
# array([154, 319, 550, 847])

更新以匹配更新后的Q

这现在确实是一个卷积,所以@Divakar 的解决方案或多或少适用。a[::-1]只有,你会b直接卷积。如果速度是一个问题,您可以尝试替换它np.convolvescipy.signal.fftconvolve这取决于您的操作数的大小可能会明显更快。但是,对于非常小的操作数或长度差异很大的操作数,您甚至可能会损失一些速度,因此请务必尝试两种方法:

np.convolve(b, a[::-1], 'valid')
scipy.signal.fftconvolve(b, a[::-1], 'valid')
于 2017-03-21T19:04:44.387 回答
0

方法#1

用于np.convolve两个输入之间的元素乘法以及全为 1 的内核和size=3-

np.convolve(a*b,np.ones(3),'valid')

方法#2

由于我们只是对窗口中的元素求和,我们也可以使用uniform_filter,像这样 -

from scipy.ndimage.filters import uniform_filter1d as unif1d

def uniform_filter(a,W):
    hW = (W-1)//2
    return W*unif1d(a.astype(float),size=W, mode='constant')[hW:-hW]

out = uniform_filter(a*b,W=3)

基准测试

循环方法 -

def loopy_approach(a,b):
    window = 3
    N = a.size-window+1

    result = np.empty([N,1])
    result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
    for i in range(N-1):
        result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
    return result

计时和验证 -

In [147]: a = np.random.randint(0,100,(1000))
     ...: b = np.random.randint(0,100,(1000))
     ...: 

In [148]: out0 = loopy_approach(a,b).ravel()
     ...: out1 = np.convolve(a*b,np.ones(3),'valid')
     ...: out2 = uniform_filter(a*b,W=3)
     ...: 

In [149]: np.allclose(out0,out1)
Out[149]: True

In [150]: np.allclose(out0,out2)
Out[150]: True

In [151]: %timeit loopy_approach(a,b)
     ...: %timeit np.convolve(a*b,np.ones(3),'valid')
     ...: %timeit uniform_filter(a*b,W=3)
     ...: 
100 loops, best of 3: 2.27 ms per loop
100000 loops, best of 3: 7 µs per loop
100000 loops, best of 3: 10.2 µs per loop
于 2017-03-21T18:03:23.030 回答
0

另一种使用步幅的方法是:

In [12]: from numpy.lib.stride_tricks import as_strided
In [13]: def using_strides(a, b, w=3):
              shape = a.shape[:-1] + (a.shape[-1] - w + 1, w)
              strides = a.strides + (a.strides[-1],)
              res = np.sum((as_strided(a, shape=shape, strides=strides) * \ 
                            as_strided(b, shape=shape, strides=strides)), axis=1)
              return res[:, np.newaxis]


In [14]: using_strides(a, b, 3)
Out[14]: 
array([[154],
       [319],
       [550],
       [847]])
于 2017-03-21T19:32:38.420 回答