7

我需要扩展这个问题,它根据第二个数组的索引对数组的值求和。设A结果数组、B索引数组和要求和C的数组。然后就这么A[i] = sum过去了。Cindex(B) == i

相反,我的设置是

N = 5
M = 2

A = np.zeros((M,N))
B = np.random.randint(M, size=N) # contains indices for A
C = np.random.rand(N,N)

我需要 A[i,j] = sum_{k in 0...N} C[j,k]这样C[k] == i,即以匹配 i 的 B 的索引为条件的行和。有没有一种有效的方法来做到这一点?对于我的应用程序,N 约为 10,000,M 约为 20。在最小化问题中的每次迭代都会调用此操作……我当前的循环方法非常慢。

谢谢!

4

2 回答 2

4

在@DSM 的评论之后,我假设你C[k] == i应该是B[k] == i. 如果是这样的话,你的循环版本看起来像这样吗?

嵌套循环版本

import numpy as np

N = 5
M = 2

A = np.zeros((M,N))
B = np.random.randint(M, size=N) # contains indices for A
C = np.random.rand(N,N)

for i in range(M):
    for j in range(N):
        for k in range(N):
            if B[k] == i:
                A[i,j] += C[j,k]

有不止一种方法可以矢量化这个问题。我将在下面展示我的思考过程,但是有更有效的方法可以做到这一点(例如,@DSM 的版本可以识别问题中固有的矩阵乘法)。

为了解释起见,这里是一种方法的演练。


向量化内循环

让我们从重写内部k循环开始:

for i in range(M):
    for j in range(N):
        A[i,j] = C[j, B == i].sum()

将其视为 可能更容易C[j][B == i].sum()。我们只是选择的jC,仅选择该行中B等于的元素i,然后对它们求和。


向量化最外层循环

接下来让我们分解外i循环。现在我们要达到可读性开始受到影响的地步,不幸的是......

i = np.arange(M)[:,np.newaxis]
mask = (B == i).astype(int)
for j in range(N):
    A[:,j] = (C[j] * mask).sum(axis=-1)

这里有几个不同的技巧。在这种情况下,我们正在遍历A. 的每一列A是 的相应行的子集的总和C。行的子集C由 whereB等于行索引确定i

为了绕过迭代i,我们正在制作一个二维数组,其中B == i通过向 i 添加一个新轴。(如果您对此感到困惑,请查看numpy 广播的文档。)换句话说:

B:
    array([1, 1, 1, 1, 0])

i: 
    array([[0],
           [1]])

B == i:
    array([[False, False, False, False,  True],
           [ True,  True,  True,  True, False]], dtype=bool)

我们想要的是取两个 ( M) 过滤的总和C[j],一个用于B == i. j这将为我们提供与 中的第列对应的二元素向量A

我们不能通过C直接索引来做到这一点,因为结果不会保持它的形状,因为每一行可能有不同数量的元素。我们将通过将B == i掩码乘以 的当前行来解决这个问题,结果是C零,而当前行中的值为真。 B == iFalseC

为此,我们需要将布尔数组B == i转换为整数:

mask = (B == i).astype(int):
    array([[0, 0, 0, 0, 1],
           [1, 1, 1, 1, 0]])

因此,当我们将它乘以当前行时C

C[j]:
    array([ 0.19844887,  0.44858679,  0.35370919,  0.84074259,  0.74513377])

C[j] * mask:
    array([[ 0.        ,  0.        ,  0.        ,  0.        ,  0.74513377],
           [ 0.19844887,  0.44858679,  0.35370919,  0.84074259,  0.        ]])

然后我们可以对每一行求和以获得当前列A(这将在分配给时广播到一列A[:,j]):

(C[j] * mask).sum(axis=-1):
    array([ 0.74513377,  1.84148744])

全矢量化版本

最后,分解最后一个循环,我们可以应用完全相同的原理为循环添加第三个维度 over j

i = np.arange(M)[:,np.newaxis,np.newaxis]
mask = (B == i).astype(int)
A = (C * mask).sum(axis=-1)

@DSM 的矢量化版本

正如@DSM 建议的那样,您也可以这样做:

A = (B == np.arange(M)[:,np.newaxis]).dot(C.T)

M对于大多数大小的and来说,这是迄今为止最快的解决方案N,并且可以说是最优雅的(无论如何,比我的解决方案优雅得多)。

让我们分解一下。

这与上面的“矢量化最外层循环”部分 B == np.arange(M)[:,np.newaxis]完全相同。B == i

关键是要认识到所有的jandk循环都等价于矩阵乘法。 dot会将布尔B == i数组转换为与幕后相同的 dtype C,因此我们无需担心将其显式转换为不同的类型。

之后,我们只是对C(一个 5x5 数组)的转置和上面的“掩码”0 和 1 数组执行矩阵乘法,得到一个 2x5 数组。

dot将利用您已安装的任何优化的 BLAS 库(例如ATLASMKL),因此速度非常快。


计时

对于 small M's 和N's,差异不太明显(循环和 DSM 版本之间的约 6 倍):

M, N = 2, 5

%timeit loops(B,C,M)
10000 loops, best of 3: 83 us per loop

%timeit k_vectorized(B,C,M)
10000 loops, best of 3: 106 us per loop

%timeit vectorized(B,C,M)
10000 loops, best of 3: 23.7 us per loop

%timeit askewchan(B,C,M)
10000 loops, best of 3: 42.7 us per loop

%timeit einsum(B,C,M)
100000 loops, best of 3: 15.2 us per loop

%timeit dsm(B,C,M)
100000 loops, best of 3: 13.9 us per loop

M但是,一旦N开始增长,差异就会变得非常显着(~600x)(注意单位!):

M, N = 50, 20

%timeit loops(B,C,M)
10 loops, best of 3: 50.3 ms per loop

%timeit k_vectorized(B,C,M)
100 loops, best of 3: 10.5 ms per loop

%timeit ik_vectorized(B,C,M)
1000 loops, best of 3: 963 us per loop

%timeit vectorized(B,C,M)
1000 loops, best of 3: 247 us per loop

%timeit askewchan(B,C,M)
1000 loops, best of 3: 493 us per loop

%timeit einsum(B,C,M)
10000 loops, best of 3: 134 us per loop

%timeit dsm(B,C,M)
10000 loops, best of 3: 80.2 us per loop
于 2013-04-07T17:55:33.340 回答
3

我假设@DSM 找到了您的错字,并且您想要:

A[i,j] = sum_{k in 0...N} C[j,k] where B[k] == i

然后你可以循环,i in range(M)因为M它相对较小。

A = np.array([C[:,B == i].sum(axis=1) for i in range(M)])
于 2013-04-07T17:57:13.410 回答