22

我试图沿着一个非常大的 2D numpy 数组的一维找到最小数组索引。我发现这非常慢(已经尝试用瓶颈加速它,这只是一个很小的改进)。但是,采用直线最小值似乎要快一个数量级:

import numpy as np
import time

randvals = np.random.rand(3000,160000)
start = time.time()
minval = randvals.min(axis=0)
print "Took {0:.2f} seconds to compute min".format(time.time()-start)
start = time.time()
minindex = np.argmin(randvals,axis=0)
print "Took {0:.2f} seconds to compute argmin".format(time.time()-start)

在我的机器上输出:

Took 0.83 seconds to compute min
Took 9.58 seconds to compute argmin

argmin 这么慢有什么原因吗?有什么办法可以将其加速到与 min 相当的速度吗?

4

3 回答 3

13
In [1]: import numpy as np

In [2]: a = np.random.rand(3000, 16000)

In [3]: %timeit a.min(axis=0)
1 loops, best of 3: 421 ms per loop

In [4]: %timeit a.argmin(axis=0)
1 loops, best of 3: 1.95 s per loop

In [5]: %timeit a.min(axis=1)
1 loops, best of 3: 302 ms per loop

In [6]: %timeit a.argmin(axis=1)
1 loops, best of 3: 303 ms per loop

In [7]: %timeit a.T.argmin(axis=1)
1 loops, best of 3: 1.78 s per loop

In [8]: %timeit np.asfortranarray(a).argmin(axis=0)
1 loops, best of 3: 1.97 s per loop

In [9]: b = np.asfortranarray(a)

In [10]: %timeit b.argmin(axis=0)
1 loops, best of 3: 329 ms per loop

也许min足够聪明,可以在数组上按顺序完成工作(因此具有缓存局部性),并且argmin在数组中跳跃(导致大量缓存未命中)?

无论如何,如果您愿意randvals从一开始就保留为 Fortran 有序数组,它会更快,尽管复制到 Fortran 有序数组没有帮助。

于 2013-07-24T18:00:47.013 回答
9

我只是看了一下源代码,虽然我不完全理解为什么事情会以这种方式完成,但会发生这种情况:

  1. np.min基本上是调用np.minimum.reduce.

  2. np.argmin首先将要操作的轴移动到形状元组的末尾,然后使其成为一个连续数组,这当然会触发整个数组的副本,除非该轴是最后一个开始的轴。

由于正在制作副本,您可以发挥创意并尝试实例化更便宜的数组:

a = np.random.rand(1000, 2000)

def fast_argmin_axis_0(a):
    matches = np.nonzero((a == np.min(a, axis=0)).ravel())[0]
    rows, cols = np.unravel_index(matches, a.shape)
    argmin_array = np.empty(a.shape[1], dtype=np.intp)
    argmin_array[cols] = rows
    return argmin_array

In [8]: np.argmin(a, axis=0)
Out[8]: array([230, 532, 815, ..., 670, 702, 989], dtype=int64)

In [9]: fast_argmin_axis_0(a)
Out[9]: array([230, 532, 815, ..., 670, 702, 989], dtype=int64)

In [10]: %timeit np.argmin(a, axis=0)
10 loops, best of 3: 27.3 ms per loop

In [11]: %timeit fast_argmin_axis_0(a)
100 loops, best of 3: 15 ms per loop

我不会将当前的实现称为错误,因为 numpy 可能有充分的理由按照它的方式进行操作,但是这种诡计可以加速应该是高度优化的功能,强烈建议事情可以做得更好。

于 2013-07-24T19:11:31.023 回答
1

我只是遇到了同样的问题,发现当轴0选择最小值时,性能差异很大。如建议的那样,问题似乎与内部复制数组有关。

我使用 Cython 设计了一个相当简单的解决方法,它同时沿给定轴建立最小值及其在 2D numpy 数组中的位置。请注意,对于axis = 0,该算法同时处理一组列(由常量指定的最大数量blocksize- 这里设置相当于 8 kByte 的数据)以充分利用缓存:

%%cython -c=-O2 -c=-march=native

import numpy as np
cimport numpy as np 
cimport cython
from libc.stdint cimport uint32_t

@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_64_axis_0(uint32_t[:] minloc, double[:] minimum, double[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 64-bit floats along axis 0
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float64, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(n)
       Stores the rows where the minima occur for each column.
    minimum: numpy_array, dtype=np.float64, shape=(n)
       The minima along each column.
    """

    # Columns of the matrix are accessed in blocks to increase cache performance.
    # Specify the number of columns here:

    DEF blocksize = 1024

    cdef int i, j, k
    cdef double minim[blocksize]
    cdef uint32_t minimloc[blocksize]

    # Read blocks of data to make good use of the cache

    cdef int blocks
    blocks  = arr.shape[1] / blocksize
    remains = arr.shape[1] % blocksize

    for i in xrange(0, blocksize * blocks, blocksize):

        for k in xrange(blocksize):
            minim[k]    = arr[0, i + k]
            minimloc[k] = 0

        for j in xrange(1, arr.shape[0]):

            for k in xrange(blocksize):

                if arr[j, i + k] < minim[k]:
                    minim[k] = arr[j, i + k]
                    minimloc[k] = j

        for k in xrange(blocksize):
            minimum[i + k] = minim[k]
            minloc[i + k]  = minimloc[k]

    # Work on the final 'incomplete' block

    i = blocksize * blocks

    for k in xrange(remains):
        minim[k]    = arr[0, i + k]
        minimloc[k] = 0

    for j in xrange(1, arr.shape[0]):

        for k in xrange(remains):

            if arr[j, i + k] < minim[k]:
                minim[k] = arr[j, i + k]
                minimloc[k] = j

    for k in xrange(remains):
        minimum[i + k] = minim[k]
        minloc[i + k]  = minimloc[k]

    # Done!


@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_64_axis_1(uint32_t[:] minloc, double[:] minimum, double[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 64-bit floats along axis 1
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float64, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(m)
       Stores the rows where the minima occur for each row.
    minimum: numpy_array, dtype=np.float64, shape=(m)
       The minima along each row.
    """
    cdef int i
    cdef int j
    cdef double minim
    cdef uint32_t minimloc

    for i in xrange(arr.shape[0]):
        minim = arr[i, 0]
        minimloc = 0

        for j in xrange(1, arr.shape[1]):
            if arr[i, j] < minim:
                minim = arr[i, j]
                minimloc = j

        minimum[i] = minim
        minloc[i]  = minimloc


@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_32_axis_0(uint32_t[:] minloc, float[:] minimum, float[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 32-bit floats along axis 0
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float32, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(n)
       Stores the rows where the minima occur for each column.
    minimum: numpy_array, dtype=np.float32, shape=(n)
       The minima along each column.
    """

    # Columns of the matrix are accessed in blocks to increase cache performance.
    # Specify the number of columns here:  

    DEF blocksize = 2048

    cdef int i, j, k
    cdef float minim[blocksize]
    cdef uint32_t minimloc[blocksize]

    # Read blocks of data to make good use of the cache

    cdef int blocks
    blocks  = arr.shape[1] / blocksize
    remains = arr.shape[1] % blocksize

    for i in xrange(0, blocksize * blocks, blocksize):

        for k in xrange(blocksize):
            minim[k]    = arr[0, i + k]
            minimloc[k] = 0

        for j in xrange(1, arr.shape[0]):

            for k in xrange(blocksize):

                if arr[j, i + k] < minim[k]:
                    minim[k] = arr[j, i + k]
                    minimloc[k] = j

        for k in xrange(blocksize):
            minimum[i + k] = minim[k]
            minloc[i + k]  = minimloc[k]

    # Work on the final 'incomplete' block

    i = blocksize * blocks

    for k in xrange(remains):
        minim[k]    = arr[0, i + k]
        minimloc[k] = 0

    for j in xrange(1, arr.shape[0]):

        for k in xrange(remains):

            if arr[j, i + k] < minim[k]:
                minim[k] = arr[j, i + k]
                minimloc[k] = j

    for k in xrange(remains):
        minimum[i + k] = minim[k]
        minloc[i + k]  = minimloc[k]

    # Done!

@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_32_axis_1(uint32_t[:] minloc, float[:] minimum, float[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 32-bit floats along axis 1
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float32, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(m)
       Stores the rows where the minima occur for each row.
    minimum: numpy_array, dtype=np.float32, shape=(m)
       The minima along each row.
    """
    cdef int i
    cdef int j
    cdef float minim
    cdef uint32_t minimloc

    for i in xrange(arr.shape[0]):
        minim = arr[i, 0]
        minimloc = 0

        for j in xrange(1, arr.shape[1]):
            if arr[i, j] < minim:
                minim = arr[i, j]
                minimloc = j

        minimum[i] = minim
        minloc[i]  = minimloc

def Min_Argmin(array_2d, axis = 1):
    """
    Find the minima and corresponding locations (argmin) of a two-dimensional
    numpy array of floats along a given axis simultaneously, and returns them
    as a tuple of arrays (min_2d, argmin_2d).

    (Note: float16 arrays will be internally transformed to float32 arrays.)
    ----------
    array_2d: numpy_array, dtype=np.float32 or np.float64, shape=(m, n)
       The array for which the minima should be computed.
    axis : int, 0 or 1 (default) : 
        The axis along which minima are computed.
    min_2d: numpy_array, dtype=np.uint8, shape=(m) or shape=(n):
       The array where the minima along the given axis are stored.
    argmin_2d:
       The array storing the row/column where the minimum occurs.
    """

    # Sanity checks:
    if len(array_2d.shape) != 2:
        raise IndexError('Min_Argmin: Number of dimensions of array must be 2')

    if not (axis == 0 or axis == 1):
        raise ValueError('Min_Argmin: Invalid axis specified')

    arr_type = array_2d.dtype

    if not arr_type in ('float16', 'float32', 'float64'):
        raise ValueError('Min_Argmin: Not a float array')

    # Transform float16 arrays
    if arr_type == 'float16':
        array_2d = array_2d.astype(np.float32)

    # Run analysis

    if arr_type == 'float64':

        # Double accuracy

        min_array    = np.zeros(array_2d.shape[1 - axis], dtype = np.float64)
        argmin_array = np.zeros(array_2d.shape[1 - axis], dtype = np.uint32)

        if (axis == 0):
            _minargmin_2d_64_axis_0(argmin_array, min_array, array_2d)

        else:
            _minargmin_2d_64_axis_1(argmin_array, min_array, array_2d)

    else:

        # Single accuracy

        min_array    = np.zeros(array_2d.shape[1 - axis], dtype = np.float32)
        argmin_array = np.zeros(array_2d.shape[1 - axis], dtype = np.uint32)

        if (axis == 0):
            _minargmin_2d_32_axis_0(argmin_array, min_array, array_2d)

        else:
            _minargmin_2d_32_axis_1(argmin_array, min_array, array_2d)

        # Transform back to float16 type as necessary

        if arr_type == 'float16':
            min_array = min_array.astype(np.float16)


    # Return results
    return min_array, argmin_array

加载 Cython 支持后,可以在 IPython 笔记本单元中放置和编译代码:

%load_ext Cython

然后以如下形式调用:

min_array, argmin_array = Min_Argmin(two_dim_array, axis = 0 or 1)

时序示例:

random_array = np.random.rand(20000, 20000).astype(np.float32)

%timeit min_array, argmin_array = Min_Argmin(random_array, axis = 0)
%timeit min_array, argmin_array = Min_Argmin(random_array, axis = 1)

1 loops, best of 3: 405 ms per loop
1 loops, best of 3: 307 ms per loop

为了比较:

%%timeit 
min_array    = random_array.min(axis = 0)
argmin_array = random_array.argmin(axis = 0)

1 loops, best of 3: 10.3 s per loop

%%timeit 
min_array    = random_array.min(axis = 1)
argmin_array = random_array.argmin(axis = 1)

1 loops, best of 3: 423 ms per loop

所以,有一个显着的加速(如果一个人对最小值和位置都感兴趣的话axis = 0,仍然有一个小的优势)。axis = 1

于 2015-11-10T14:49:14.083 回答