6

我正在尝试在两个 3D 数组上广播“>”的简单操作。一个具有尺寸 (m, 1, n),另一个具有尺寸 (1, m, n)。如果我更改第三维 (n) 的值,我会天真地期望计算速度会缩放为 n。

但是,当我尝试明确测量这一点时,我发现当 n 从 1 增加到 2 时,计算时间增加了大约 10 倍,之后缩放是线性的。

为什么从 n=1 到 n=2 时计算时间会急剧增加?我假设它是 numpy 中的内存管理工件,但我正在寻找更多细节。

代码附在下面的结果图中。

import numpy as np
import time
import matplotlib.pyplot as plt

def compute_time(n):

    x, y = (np.random.uniform(size=(1, 1000, n)), 
            np.random.uniform(size=(1000, 1, n)))

    t = time.time()
    x > y 
    return time.time() - t

a = [
        [
            n, np.asarray([compute_time(n) 
            for _ in range(100)]).mean()
        ]
        for n in range(1, 30, 1)
    ]

a = np.asarray(a)
plt.plot(a[:, 0], a[:, 1])
plt.xlabel('n')
plt.ylabel('time(ms)')
plt.show()

广播操作的时间图

在此处输入图像描述

4

2 回答 2

6

我无法证明这一点,但我很确定这是由于一项仅在 n==1 时可用的简单优化。

目前,numpy ufunc 实现基于最内层循环的计算机生成代码,该代码映射到一个简单的 C 循环。封闭循环需要使用完全成熟的迭代器对象,这取决于有效负载,即最内层循环的大小和原子操作的成本可能是一个很大的开销。

现在,在 n==1 处,问题本质上是二维的(numpy 足够聪明,可以检测到),最内层的循环大小为 1000,因此迭代器对象有 1000 步。从 n==2 向上,最里面的循环的大小为 n,我们有 1,000,000 步的迭代器对象,它解释了您正在观察的跳转。

正如我所说,我无法证明它,但我可以让它看起来合理:如果我们将变量维度移到前面,那么最内层循环的大小恒定为 1000,而外层循环在 1000 个迭代步骤中线性增长。确实,这使跳跃消失了。

在此处输入图像描述

代码:

import numpy as np
import time
import matplotlib.pyplot as plt

def compute_time(n, axis=2):
    xs, ys = [1, 10], [10, 1]
    xs.insert(axis, n)
    ys.insert(axis, n)
    x, y = (np.random.uniform(size=xs),
            np.random.uniform(size=ys))

    t = time.perf_counter()
    x > y
    return time.perf_counter() - t

a = [
        [
            n,
            np.asarray([compute_time(n) for _ in range(100)]).mean(),
            np.asarray([compute_time(n, 0) for _ in range(100)]).mean()
        ]
        for n in range(0, 10, 1)
     ]

a = np.asarray(a)
plt.plot(a[:, 0], a[:, 1:])
plt.xlabel('n')
plt.ylabel('time(ms)')
plt.show()

相关:https ://stackoverflow.com/a/48257213/7207392

于 2018-10-12T10:02:44.380 回答
5

@Paul 的理论非常正确。在这个答案中,我使用perf调试器来深入研究以支持这个理论。

首先,让我们看看运行时间都花在了哪些地方(具体代码请参见下面的 run.py 清单)。

因为n=1我们看到以下内容:

Event count (approx.): 3388750000
Overhead  Command  Shared Object                               Symbol                                                               
  34,04%  python   umath.cpython-36m-x86_64-linux-gnu.so       [.] DOUBLE_less
  32,71%  python   multiarray.cpython-36m-x86_64-linux-gnu.so  [.] _aligned_strided_to_contig_size8_srcstride0
  28,16%  python   libc-2.23.so                                [.] __memmove_ssse3_back
   1,46%  python   multiarray.cpython-36m-x86_64-linux-gnu.so  [.] PyArray_TransferNDimToStrided

相比n=2

Event count (approx.): 28954250000                                                              
Overhead  Command  Shared Object                               Symbol                                                               
  40,85%  python   libc-2.23.so                                [.] __memmove_ssse3_back
  40,16%  python   multiarray.cpython-36m-x86_64-linux-gnu.so  [.] PyArray_TransferNDimToStrided
   8,61%  python   umath.cpython-36m-x86_64-linux-gnu.so       [.] DOUBLE_less
   8,41%  python   multiarray.cpython-36m-x86_64-linux-gnu.so  [.] _contig_to_contig

当 n=2 时,计数的事件数增加了 8.5 倍,但只有两倍的数据,所以我们需要解释 4 的减速因子。

另一个重要的观察:运行时间主要由内存操作n=2和(不太明显)也为n=1_aligned_strided_to_contig_size8_srcstride0都是关于复制数据),它们超过了比较的成本 - DOUBLE_less

很明显,PyArray_TransferNDimtoStrided两个size都调用了,那么为什么它的运行时间占比会有这么大的差别呢?

显示的 self-timePyArray_TransferNDimtoStrided不是复制所需的时间,而是开销:调整指针,以便在最后一个维度可以通过以下方式一次性复制stransfer

 PyArray_TransferNDimToStrided(npy_intp ndim,
 ....
 /* A loop for dimensions 0 and 1 */
 for (i = 0; i < shape1; ++i) {
    if (shape0 >= count) {
        stransfer(dst, dst_stride, src, src_stride0,
                    count, src_itemsize, data);
        return 0;
    }
    else {
        stransfer(dst, dst_stride, src, src_stride0,
                    shape0, src_itemsize, data);
    }
    count -= shape0;
    src += src_stride1;
    dst += shape0*dst_stride;
}
...

这些 stransfer 函数是_aligned_strided_to_contig_size8_srcstride0(请参阅下面清单中生成的代码)和_contig_to_contig

  • _contig_to_contig用于n=2传输 2-doubles 的情况下(最后一个维度有 2 个值),调整指针的开销非常高!
  • _aligned_strided_to_contig_size8_srcstride0用于n=1每次调用并传输 1000 个双精度数(正如@Paul 指出的那样,我们很快就会看到,numpy 足够聪明地丢弃尺寸为 1 个元素的长度),调整指针的开销可以忽略不计。

顺便说一句,为了使用现代 CPU 的矢量化,使用这些函数而不是简单的 for 循环:在编译时已知步幅,编译器能够对代码进行矢量化(对于仅已知的步幅,编译器通常无法做到这一点runtime),因此 numpy 分析访问模式并分派给不同的预编译函数。

剩下一个问题:numpy 真的会丢弃最后一个维度,如果它的大小为 1,正如我们的观察所表明的那样?

使用调试器很容易验证:

至于在比较时“丢失”的速度因子:4它没有特殊含义,只是我机器上的一个随机值:将矩阵的维度从 10^3 更改为 10^4 将进一步改变优势(更少的开销)甚至更远到-case,这导致我的机器失去速度因子 12。n=2n=1n=1


运行.py

import sys
import numpy as np

n=int(sys.argv[1])

x, y = (np.random.uniform(size=(1, 1000, n)), 
        np.random.uniform(size=(1000, 1, n)))

for _ in range(10000):
    y<x

接着:

perf record python run.py 1
perf report
....
perf record python run.py 2
perf report

生成的来源_aligned_strided_to_contig_size8_srcstride0

/*
 * specialized copy and swap for source stride 0,
 * interestingly unrolling here is like above is only marginally profitable for
 * small types and detrimental for >= 8byte moves on x86
 * but it profits from vectorization enabled with -O3
 */
#if (0 == 0) && 1
static NPY_GCC_OPT_3 void
_aligned_strided_to_contig_size8_srcstride0(char *dst,
                        npy_intp dst_stride,
                        char *src, npy_intp NPY_UNUSED(src_stride),
                        npy_intp N, npy_intp NPY_UNUSED(src_itemsize),
                        NpyAuxData *NPY_UNUSED(data))
{
#if 8 != 16
#  if !(8 == 1 && 1)
    npy_uint64 temp;
#  endif
#else
    npy_uint64 temp0, temp1;
#endif
    if (N == 0) {
        return;
    }
#if 1 && 8 != 16
    /* sanity check */
    assert(npy_is_aligned(dst, _ALIGN(npy_uint64)));
    assert(npy_is_aligned(src, _ALIGN(npy_uint64)));
#endif
#if 8 == 1 && 1
    memset(dst, *src, N);
#else

#  if 8 != 16
    temp = _NPY_NOP8(*((npy_uint64 *)src));
#  else
#    if 0 == 0
        temp0 = (*((npy_uint64 *)src));
        temp1 = (*((npy_uint64 *)src + 1));
#    elif 0 == 1
        temp0 = _NPY_SWAP8(*((npy_uint64 *)src + 1));
        temp1 = _NPY_SWAP8(*((npy_uint64 *)src));
#    elif 0 == 2
        temp0 = _NPY_SWAP8(*((npy_uint64 *)src));
        temp1 = _NPY_SWAP8(*((npy_uint64 *)src + 1));
#    endif
#  endif

    while (N > 0) {
#  if 8 != 16
        *((npy_uint64 *)dst) = temp;
#  else
        *((npy_uint64 *)dst) = temp0;
        *((npy_uint64 *)dst + 1) = temp1;
#  endif
#  if 1
        dst += 8;
#  else
        dst += dst_stride;
#  endif
        --N;
    }
#endif/* @elsize == 1 && 1 -- else */
}
#endif/* (0 == 0) && 1 */
于 2018-11-03T00:31:29.270 回答