16

我正在尝试使用 NumPy 和矢量化操作来使一段代码运行得更快。但是,我似乎对如何向量化此代码有误解(可能是由于对向量化的理解不完整)。

这是带有循环的工作代码(A和B是设定大小的二维数组,已经初始化):

for k in range(num_v):
    B[:] = A[:]
    for i in range(num_v):
        for j in range(num_v):
            A[i][j] = min(B[i][j], B[i][k] + B[k][j])
return A

这是我对上述代码进行矢量化的尝试:

for k in range(num_v):
    B = numpy.copy(A)
    A = numpy.minimum(B, B[:,k] + B[k,:])
return A

为了测试这些,我使用了以下代码,上面的代码包含在一个名为“算法”的函数中:

def setup_array(edges, num_v):
    r = range(1, num_v + 1)
    A = [[None for x in r] for y in r]  # or (numpy.ones((num_v, num_v)) * 1e10) for numpy
    for i in r:
        for j in r:
            val = 1e10
            if i == j:
                val = 0 
            elif (i,j) in edges:
                val = edges[(i,j)]
            A[i-1][j-1] = val 
    return A

A = setup_array({(1, 2): 2, (6, 4): 1, (3, 2): -3, (1, 3): 5, (3, 6): 5, (4, 5): 2, (3, 1): 4, (4, 3): 8, (3, 4): 6, (2, 4): -4, (6, 5): -5}, 6) 
B = []
algorithm(A, B, 6)

预期的结果,以及我使用第一个代码得到的是:

[[0, 2, 5, -2, 0, 10] 
 [8, 0, 4, -4, -2, 9]
 [4, -3, 0, -7, -5, 5]
 [12, 5, 8, 0, 2, 13]
 [10000000000.0, 9999999997.0, 10000000000.0, 9999999993.0, 0, 10000000000.0]
 [13, 6, 9, 1, -5, 0]]

第二个(矢量化)函数改为返回:

[[ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0. -4.  0.  0.]
 [ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0.  0. -5.  0.]]

我错过了什么?

4

2 回答 2

16

通常你想对代码进行矢量化,因为你认为它运行得太慢了。
如果您的代码太慢,那么我可以告诉您正确的索引会使其更快。
而不是A[i][j]你应该写A[i, j]- 这避免了(子)数组的临时副本。
由于您在代码的最内层循环中执行此操作,这可能会非常昂贵。

看这里:

In [37]: timeit test[2][2]
1000000 loops, best of 3: 1.5 us per loop

In [38]: timeit test[2,2]
1000000 loops, best of 3: 639 ns per loop

在您的代码中始终如一地执行此操作——我坚信这已经解决了您的性能问题!

话说回来...

...这是我对如何矢量化的看法

for k in range(num_v):
    numpy.minimum(A, np.add.outer(A[:,k], A[k,:]), A)
return A

numpy.minimum将比较两个数组并按元素返回两个元素中较小的一个。如果您传递第三个参数,它将获取输出。如果这是一个输入数组,则整个操作就位。

正如 Peter de Rivay 解释的那样,您的广播解决方案存在问题 - 但从数学上讲,您想要做的是某种外积而不是两个向量的相加。因此,您可以在 add 函数上使用外部操作。

NumPy 的二进制 ufunc 有特殊的方法来执行某些特殊的向量化操作,如 reduce、accumulate、sum 和 outer。

于 2013-01-21T21:41:34.383 回答
12

该问题是由该行中的数组广播引起的:

A = numpy.minimum(B, B[:,k] + B[k,:])

B 的大小为 6 x 6,B[:,k] 是一个包含 6 个元素的数组,B[k,:] 是一个包含 6 个元素的数组。

(因为您使用的是 numpy 数组类型,所以 B[:,k] 和 B[k,:] 都返回形状为 N 的 rank-1 数组)

Numpy 会自动更改大小以匹配:

  1. 首先将 B[:,k] 添加到 B[k,:] 以生成具有 6 个元素的中间数组结果。(这不是你想要的)
  2. 其次,这个 6 元素数组通过重复行广播到 6 x 6 矩阵
  3. 第三,计算原始矩阵和该广播矩阵的最小值。

这意味着您的 numpy 代码相当于:

for k in range(num_v):
   B[:] = A[:]
   C=[B[i][k]+B[k][i] for i in range(num_v)]
   for i in range(num_v):
      for j in range(num_v):
         A[i][j] = min(B[i][j], C[j])

修复代码的最简单方法是使用矩阵类型而不是数组类型:

A = numpy.matrix(A)
for k in range(num_v):
    A = numpy.minimum(A, A[:,k] + A[k,:])

矩阵类型使用更严格的广播规则,因此在这种情况下:

  1. A[:,k] 通过重复列扩展为 6 x 6 矩阵
  2. A[k,:] 通过重复行扩展为 6 x 6 矩阵
  3. 广播的矩阵加在一起形成一个 6 x 6 矩阵
  4. 应用最小值
于 2013-01-21T22:33:58.853 回答