1

我有这样的结构:

a = np.array([[np.array([1,2,3]), np.array([2])], [np.array([0,1,2,3,4]), np.array([0,4])]])

它是一个 2x2 结构,每个元素都可以有任意维度

我想得到每个元素的平均值a

np.mean(a)

ValueError: operands could not be broadcast together with shapes (3) (5) 

我试着玩axis,但它不起作用。我想得到一个新的np.array等于[[2, 2], [2, 2]

一般来说,我希望能够以a相同的方式运行任何矢量化函数。

怎么做?我需要快速代码,所以请避免显式循环。

我能做的最好的是:

f = np.mean
result = np.zeros(a.shape)
for i in np.ndindex(a.shape):
  result[i] = f(a[i])
4

3 回答 3

3

我猜你想要numpy.vectorize,它基本上就像mapndarrays 的内置函数。

>>> a = np.array([[np.array([1,2,3]), np.array([2])], [np.array([0,1,2,3,4]), np.array([0,4])]])
>>> vmean = np.vectorize(np.mean)
>>> vmean(a)
array([[ 2.,  2.],
       [ 2.,  2.]])
于 2013-02-27T01:05:52.743 回答
3

这是我能做的最好的:

a = np.array([[np.array([1,2,3]), np.array([2])], [np.array([0,1,2,3,4]), np.array([0,4])]])
np.array([ np.mean(b) for b in a.ravel()]).reshape(a.shape)

输出

array([[ 2.,  2.],
       [ 2.,  2.]])

可以概括为其他功能:

def ravel_func(func,arr):
    return np.asarray([ func(part) for part in arr.ravel()]).reshape(arr.shape)

速度测试,感谢Jaime

In [82]: timeit vmean(a)
10000 loops, best of 3: 65 us per loop

In [83]: timeit ravel_func(np.mean,a)
10000 loops, best of 3: 67 us per loop
于 2013-02-27T00:54:27.260 回答
2

根据您的评论:您拥有相对较少的相当大的元素。这意味着外循环的迭代速度无关紧要,而内循环的迭代速度至关重要。

让我们在这上面放一些实际的数字。您的外部数组最多有 4 个维度,最大大小为 10。这意味着最多有 10000 个元素。同时,元素“相当大”——让我们保守地解释为只有 50 个。所以,你有 510000 次循环迭代。您为提高 10000 次外部迭代的速度所做的任何事情都会使您的代码产生不到 2% 的差异。事实上,它远低于这个数字——2% 的人假设除了迭代本身之外没有其他工作要做,这显然是不正确的。

所以,你专注于错误的地方。只有 500000 次内部迭代很重要。如果您可以用单个高一维数组替换数组数组并在 中完成所有操作numpy,它可能会更快,但会使您的代码更加复杂且难以理解,以获得大约 1 的分数% 是愚蠢的。只需使用vectorize简单明了的答案或理解答案即可。


同时:

可能我应该尝试并行化评估,对每个矩阵元素使用一个线程。

并行性在这里是个好主意。但不使用线程,也不是每个元素一个。

例如,如果您有一台 8 核机器,使用 8 个硬件线程意味着您完成工作的速度几乎快 8 倍。但是使用 10000 个硬件线程并不意味着您完成任务的速度是 10000 倍,甚至是 8 倍——您可能会花费大量时间进行上下文切换、分配和释放线程堆栈等,实际上它比顺序执行花费的时间更长版本。因此,您想创建一个由 8 个硬件线程组成的工作池,并将 10000 个任务放在一个队列中以由该池运行。

此外,10000 可能太细了。每项工作都有一点开销,如果你的工作太小,你就会在开销上浪费太多时间。批量处理的明显方法是每个轴——有 1000 个作业,每个作业执行一行 10 个元素,或 100 个作业,每个作业执行一个包含 100 个元素的二维数组。测试 0、1、2 和 3 轴的批量大小,看看哪个提供了最佳性能。如果差异很大,您可能需要尝试更复杂的批处理,例如拆分为 3x10x10 和 4x10x10 的 3D 数组。

最后,虽然 Python 线程是真正的操作系统(因此也是硬件)线程,但 GIL 会阻止其中多个线程同时运行 Python 代码。numpy做了一些工作来解决这个问题,但它并不完美(特别是如果你在numpy调用之间有很多开销)。解决这个问题的最简单方法是使用multiprocessing,它可以让您拥有一个由 8 个独立进程组成的池,而不是 1 个进程中的 8 个线程。这会显着增加开销——您要么需要复制子数组(隐式地,作为任务函数的参数和返回值,要么显式地通过管道)或将它们放在共享内存中。但是,如果您的任务规模足够大,这通常不是问题。


以下是如何并行化现有代码的示例:

f = np.mean
result = np.zeros(a.shape)
future_map = {}
with futures.ProcessPoolExecutor(max_workers=8) as executor:
    for i in np.ndindex(a.shape):
        future_map[executor.submit(f, a[i])] = i
    for future in futures.as_completed(future_map):
        i = future_map[future]
        result[i] = future.result()

显然,您可以简化它(例如,用dict理解替换提交循环),但我想让您需要更改的内容尽可能明显。

另外,我正在使用futures(需要 3.2+;如果您使用的是 2.7,请从 PyPI 安装backport),因为它使代码更简单一些;如果您需要更大的灵活性,则需要更复杂的multiprocessing库。

最后,我没有做任何批处理——每个任务都是一个元素——所以开销可能会非常糟糕。

但是从这个开始,尽可能简化它,然后将其转换为使用 1、2 和 3 轴等批次,如前所述。

于 2013-02-27T18:48:37.413 回答