1

我正在尝试编写一个支持广播并且同时速度很快的函数。但是,numpy 的零秩数组照常造成麻烦。我在谷歌上找不到任何有用的东西,或者在这里搜索。

所以,我问你。我应该如何有效地实现广播并同时处理零秩数组?

整个帖子变得比预期的要大,对不起。

细节:

为了澄清我在说什么,我将举一个简单的例子:

假设我想实现一个Heaviside step-function。即作用于实轴的函数,在负侧为 0,在正侧为 1,并且在点 0 处根据情况分别为 0、0.5 或 1。

执行

掩蔽

到目前为止,我发现的最有效的方法如下。它使用布尔数组作为掩码,将正确的值分配给输出向量中的相应槽。

from numpy import *

def step_mask(x, limit=+1):
    """Heaviside step-function.

    y = 0  if x < 0
    y = 1  if x > 0
    See below for x == 0.

    Arguments:
        x       Evaluate the function at these points.
        limit   Which limit at x == 0?
                limit >  0:  y = 1
                limit == 0:  y = 0.5
                limit <  0:  y = 0

    Return:
        The values corresponding to x.
    """
    b = broadcast(x, limit)

    out = zeros(b.shape)

    out[x>0] = 1

    mask = (limit > 0) & (x == 0)
    out[mask] = 1
    mask = (limit == 0) & (x == 0)
    out[mask] = 0.5
    mask = (limit < 0) & (x == 0)
    out[mask] = 0

    return out

列表理解

以下numpy-docs方法是在广播对象的平面迭代器上使用列表推导。但是,对于如此复杂的函数,列表推导变得绝对不可读。

def step_comprehension(x, limit=+1):
    b = broadcast(x, limit)

    out = empty(b.shape)

    out.flat = [ ( 1 if x_ > 0 else
                  ( 0 if x_ < 0 else
                   ( 1 if l_ > 0 else
                    ( 0.5 if l_ ==0 else
                     ( 0 ))))) for x_, l_ in b ]

    return out

循环

最后,最幼稚的方法是 for 循环。这可能是最易读的选项。然而,Python for 循环并不快。因此,在数字方面是一个非常糟糕的主意。

def step_for(x, limit=+1):
    b = broadcast(x, limit)

    out = empty(b.shape)

    for i, (x_, l_) in enumerate(b):
        if x_ > 0:
            out[i] = 1
        elif x_ < 0:
            out[i] = 0
        elif l_ > 0:
            out[i] = 1
        elif l_ < 0:
            out[i] = 0
        else:
            out[i] = 0.5

    return out

测试

首先简单测试一下,看看输出是否正确。

>>> x = array([-1, -0.1, 0, 0.1, 1])
>>> step_mask(x, +1)
array([ 0.,  0.,  1.,  1.,  1.])
>>> step_mask(x, 0)
array([ 0. ,  0. ,  0.5,  1. ,  1. ])
>>> step_mask(x, -1)
array([ 0.,  0.,  0.,  1.,  1.])

这是正确的,其他两个函数给出相同的输出。

表现

效率如何?这些是时间:

In [45]: xl = linspace(-2, 2, 500001)

In [46]: %timeit step_mask(xl)
10 loops, best of 3: 19.5 ms per loop

In [47]: %timeit step_comprehension(xl)
1 loops, best of 3: 1.17 s per loop

In [48]: %timeit step_for(xl)
1 loops, best of 3: 1.15 s per loop

蒙面版本​​的性能与预期的一样好。但是,令我惊讶的是理解与 for 循环处于同一水平。

零秩数组

但是,0 秩数组会带来问题。有时您想使用函数标量输入。并且最好不必担心将所有标量包装在至少一维数组中。

>>> step_mask(1)
Traceback (most recent call last):
  File "<ipython-input-50-91c06aa4487b>", line 1, in <module>
    step_mask(1)
  File "script.py", line 22, in step_mask
    out[x>0] = 1
IndexError: 0-d arrays can't be indexed.

>>> step_for(1)
Traceback (most recent call last):
  File "<ipython-input-51-4e0de4fcb197>", line 1, in <module>
    step_for(1)
  File "script.py", line 55, in step_for
    out[i] = 1
IndexError: 0-d arrays can't be indexed.

>>> step_comprehension(1)
array(1.0)

只有列表推导可以处理 0 秩数组。其他两个版本需要对 0 秩数组进行特殊处理。

当您想对数组和标量使用相同的代码时,Numpy 会有点混乱。但是,我真的很喜欢尽可能多地处理任意输入的函数。谁知道我会在某个时候迭代哪些参数。

问题:

实现上述功能的最佳方法是什么?有没有办法避免if scalar then类似的特殊情况?

我不是在寻找内置的 Heaviside。这只是一个简化的例子。在我的代码中,上述模式出现在许多地方,以使参数迭代尽可能简单,而不会在客户端代码中乱扔 for 循环或理解。

此外,我知道 Cython,或 weave & Co.,或直接在 C 中实现。但是,上述掩码版本的性能目前就足够了。目前我想让事情尽可能简单。

更新:

在 Ophion 和 DaveP 之后,我改进了掩码版本,如下所示:

def step_mask_improved(x, limit=+1):
    b = np.broadcast(x, limit)
    out=atleast_1d(np.zeros(b.shape))

    out[np.where(x>0)]=1

    zeroindices=np.where(x==0)
    check=out[zeroindices]

    check=np.where(limit>0,1,check)
    check=np.where(limit==0,.5,check)
    check=np.where(limit<0,0,check)
    out[zeroindices]=check

    return out.reshape(b.shape)

它与 Ophium 的解决方案一样快。

In [13]: %timeit step_mask(xl)
100 loops, best of 3: 11.1 ms per loop

In [14]: %timeit step_mask2(xl)
100 loops, best of 3: 9.11 ms per loop

In [15]: %timeit step_mask_improved(xl)
100 loops, best of 3: 9.13 ms per loop

但是,如果输入是标量,它可以处理零秩数组并且仍然返回标量。

In [7]: step_mask_improved(1)
Out[7]: array(1.0)

还有什么建议吗?

4

2 回答 2

1

使用numpy.where而不是索引(其中也往往会更快一些),例如替换:

out[mask] = 0.

和:

out = numpy.where(mask, 0., out)

当你这样做时,你的函数最终看起来像这样:

def step_mask(x, limit=+1):
    mask = x > 0
    out = where(mask, 1., 0.)

    # Save this result to avoid re-computing
    x_eq_zero = x == 0

    mask = (limit > 0) & x_eq_zero
    out = where(mask, 1, out)
    mask = (limit == 0) & x_eq_zero
    out = where(mask, .5, out)
    mask = (limit < 0) & x_eq_zero
    out = where(mask, 0, out)

    return out
于 2013-07-01T21:42:44.617 回答
1

需要考虑的是减少将要执行第二次检查的数组的大小。我不知道如何绕过这些if scalar then陈述。

def step_mask2(x,limit=+1):
    b = np.broadcast(x, limit)
    out=np.zeros(b.shape)

    out[np.where(x>0)]=1

    zeroindices=np.where(x==0)
    check=out[zeroindices]

    check=np.where(limit>0,1,check)
    check=np.where(limit==0,.5,check)
    check=np.where(limit<0,0,check)
    out[zeroindices]=check

    return out

除了标量之外,还有什么限制吗?如果不是,最好使用 if 语句而不是 np.where。

时机稍微好一点:

Yours took 0.0330839157104 seconds.
Mine took 0.0210998058319 seconds.

使用 DaveP 的 atleast1d 想法进行更新:

def step_mask_improved(x, limit=+1):
    b = np.broadcast(x, limit)
    out=np.atleast_1d(np.zeros(b.shape))
    out[np.where(x>0)]=1

    zeroindices=np.where(x==0)
    check=out[zeroindices]

    check=np.where(limit>0,1,check)
    check=np.where(limit==0,.5,check)
    check=np.where(limit<0,0,check)
    out[zeroindices]=check

    return out

我不确定你为什么要重塑 - 它是什么导致零秩数组返回标量,但如果它是一个问题。

    return np.atleast_1d(out.reshape(b.shape))
于 2013-07-01T22:54:17.693 回答