0

系统

操作系统: Windows 10 (x64),Build 1909
Python 版本: 3.8.10
Numpy 版本: 1.21.2

问题

给定两个浮点数据点的 2D (N, 3)Numpy 数组,(x, y, z)在一个数组中查找点与另一个数组中的点相等的索引的 Pythonic(矢量化)方法是什么?

注意: 我的问题不同之处在于我需要它来处理实际数据集,其中两个数据集可能因浮点错误而有所不同。请阅读下面的详细信息。

历史

类似的问题已经被问过很多次了:

  1. 如何找到另一个二维数组中出现的二维numpy数组的索引
  2. 测试二维 numpy 数组中的成员资格
  3. 获取 Numpy 2d 数组相交行的索引
  4. 在另一个二维数组中查找 numpy 二维数组行的索引
  5. Numpy 2d Array 相交行的索引
  6. 在另一个二维数组中查找 numpy 二维数组行的索引

以前的尝试

SO Post 1提供了一个工作列表理解解决方案,但我正在寻找一种能够很好地扩展到大型数据集(即数百万点)的解决方案:

代码1

import numpy as np

if __name__ == "__main__":
    big_array = np.array(
        [
            [1.0, 2.0, 1.2],
            [5.0, 3.0, 0.12],
            [-1.0, 14.0, 0.0],
            [-9.0, 0.0, 13.0],
        ]
    )

    small_array = np.array(
        [
            [5.0, 3.0, 0.12],
            [-9.0, 0.0, 13.0],
        ]
    )

    inds = [
        ndx
        for ndx, barr in enumerate(big_array)
        for sarr in small_array
        if all(sarr == barr)
    ]

    print(inds)

输出1

[1, 2]

尝试SO Post 3的解决方案(类似于SO Post 2),但使用浮点数不起作用(我怀疑np.isclose需要使用一些东西):

代码3

import numpy as np

if __name__ == "__main__":
    big_array = np.array(
        [
            [1.0, 2.0, 1.2],
            [5.0, 3.0, 0.12],
            [-1.0, 14.0, 0.0],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    small_array = np.array(
        [
            [5.0, 3.0, 0.12],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    inds = np.nonzero(
        np.in1d(big_array.view("f,f").reshape(-1), small_array.view("f,f").reshape(-1))
    )[0]

    print(inds)

输出3

[ 3  4  5  8  9 10 11]

我的尝试

我试过numpy.isinnp.allnp.argwhere

inds = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)

哪个有效(并且,我认为,更具可读性和可理解性;即pythonic),但不适用于包含浮点错误的真实数据集:

import numpy as np

if __name__ == "__main__":
    big_array = np.array(
        [
            [1.0, 2.0, 1.2],
            [5.0, 3.0, 0.12],
            [-1.0, 14.0, 0.0],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    small_array = np.array(
        [
            [5.0, 3.0, 0.12],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    small_array_fpe = np.array(
        [
            [5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
            [-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
        ],
        dtype=float,
    )

    inds_no_fpe = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)

    inds_with_fpe = np.argwhere(
        np.all(np.isin(big_array, small_array_fpe), axis=1)
    ).reshape(-1)

    print(f"No Floating Point Error: {inds_no_fpe}")
    print(f"With Floating Point Error: {inds_with_fpe}")

    print(f"Are 5.0 and 5.0+1e-9 close?: {np.isclose(5.0, 5.0 + 1e-9)}")

输出:

No Floating Point Error: [1 3]
With Floating Point Error: []
Are 5.0 and 5.0+1e-9 close?: True

如何通过合并使我的上述解决方案工作(在具有浮点错误的数据集上)np.isclose?欢迎替代解决方案。

注意: 由于small_array是 的子集big_array,因此无法直接使用,np.isclose因为形状不会广播:

np.isclose(big_array, small_array_fpe)

产量

ValueError: operands could not be broadcast together with shapes (4,3) (2,3)

更新

目前,我唯一可行的解​​决方案是

inds_with_fpe = [
    ndx
    for ndx, barr in enumerate(big_array)
    for sarr in small_array_fpe
    if np.all(np.isclose(sarr, barr))
]
4

3 回答 3

2

我不会给出任何代码,但我已经大规模处理过类似的问题。我怀疑要使用这些方法中的任何一种获得不错的性能,您需要在 C 中实现核心(您可能会使用 numba)。

如果您的两个阵列都很大,那么有几种方法可以工作。这些主要归结为构建一个结构,该结构可用于从其中一个数组中找到一个点的最近邻居,然后查询另一个数据集中的每个点。

为此,我之前使用了 Kd Tree 方法和基于网格的方法。

基于网格的方法的基础是

  • 找到你的第一个数组的 3D 范围。
  • 将该区域分成 L N M 个 bin。
  • 对于第二个数组中的每个输入点,找到它的 bin。任何与它匹配的点都将在该 bin 中。

您需要处理的边缘情况是

  • 如果该点落在一个 bin 的边缘,或者足够接近被认为等于它的 bin 的边界,则可能落在另一个 bin 中 - 那么您需要搜索多个 bin 来寻找它的“相等”。
  • 如果该点落在所有 bin 之外,但靠近边缘,则“等于”它的点可能落在附近的 bin 中。

缺点是这对非均匀分布的数据不利。

好处是它相对简单。统一数据的预期运行时间为n1 * n2 / (L*N*M)(与 n1*n2 相比)。通常,您选择 L,N,M 以使其变为O(n log(n))。您还可以从对第二个数组进行排序以提高 bin 的重用性获得进一步的提升。并行化也相对容易(分箱和搜索)

Kd 树方法是类似的。IIRC 它提供了O(n log(n))行为,但实现起来比较棘手,并且数据结构的构建很难并行化)。它往往对缓存不友好,这可能意味着尽管它的渐近运行时间比基于网格的方法更好,但它在实践中可能运行得更慢。然而,它确实为非均匀分布的数据提供了更好的保证。

于 2021-09-13T03:00:12.830 回答
2

正如@Michael Anderson 已经提到的,这可以使用kd-tree 来实现。与您的答案相比,此解决方案使用绝对错误。这是否可以接受取决于问题。

例子

import numpy as np
from scipy import spatial

def find_nearest(big_array,small_array,tolerance):
    tree_big=spatial.cKDTree(big_array)
    tree_small=spatial.cKDTree(small_array)
    return tree_small.query_ball_tree(tree_big,r=tolerance)

计时

big_array=np.random.rand(100_000,3)
small_array=np.random.rand(1_000,3)
big_array[1000:2000]=small_array

%timeit find_nearest(big_array,small_array,1e-9) #find all pairs within a distance of 1e-9
#55.7 ms ± 830 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

#A. Hendry
%timeit np.argwhere(np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)).reshape(-1)
#3.24 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
于 2021-09-15T08:13:13.107 回答
0

感谢@AndrasDeak 这个答案

下面的代码片段

inds_with_fpe = np.argwhere(
    np.isclose(small_array_fpe, big_array[:, None, :]).all(-1).any(-1)
).reshape(-1)

将使代码工作。现在对应的输出是:

No Floating Point Error: [1 3]
With Floating Point Error: [1, 3]
Are 5.0 and 5.0+1e-9 close?: True

None在上面创建一个新轴(与 相同np.newaxis)。这会将big_array数组的形状更改为(4, 1, 3),这符合广播规则并允许np.isclose运行。也就是说,big_array现在是一组4 1 x 3点,并且由于其中一个轴big_array是 1,small_array_fpe因此可以将其广播到2 1 x 3数组(即 shape (2, 1, 3))并且可以逐元素比较元素。

结果是一个(4, 2, 3)布尔数组;的每个元素都按元素与 的每个元素进行big_array比较,small_array_fpe并返回它们接近(在特定容差内)的组件。由于all被调用为对象方法而不是 numpy 函数,因此函数的第一个参数实际上是axis输入数组而不是输入数组。因此,-1在上述函数中表示“数组的最后一个轴”。

我们首先返回(4, 2, 3)数组的所有索引True(即所有(x, y, z)分量都相等),从而产生一个4 x 2数组。其中任何一个是点相等True的相应索引,产生一个数组。big_array4 x 1

argwhere返回按元素分组的索引,所以它的形状通常是(number nonzero items, num dims of input array),因此我们将它展平为一个1d数组reshape(-1)

不幸的是,这需要与每个数组中的点数成二次方的内存,因为我们必须遍历 的每个元素big_array并对照 的每个元素检查它small_array_fpe。例如,要在一组另外 10,000 个点中搜索 10,000 个点,对于 32 位浮点数据,需要

Memory = 10000 * 10000 * 4 * 8 = 32 GiB RAM!

如果有人能设计出一个运行时间更快、内存量合理的解决方案,那就太棒了!

供参考:

from timeit import timeit

import numpy as np

big_array = np.array(
    [
        [1.0, 2.0, 1.2],
        [5.0, 3.0, 0.12],
        [-1.0, 14.0, 0.0],
        [-9.0, 0.0, 13.0],
    ],
    dtype=float,
)

small_array = np.array(
    [
        [5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
        [10.0, 2.0, 5.8],
        [-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
    ],
    dtype=float,
)


def approach01():
    return [
        ndx
        for ndx, barr in enumerate(big_array)
        for sarr in small_array
        if np.all(np.isclose(sarr, barr))
    ]


def approach02():
    return np.argwhere(
        np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)
    ).reshape(-1)


if __name__ == "__main__":
    time01 = timeit(
        "approach01()",
        number=10000,
        setup="import numpy as np; from __main__ import approach01",
    )
    time02 = timeit(
        "approach02()",
        number=10000,
        setup="import numpy as np; from __main__ import approach02",
    )

    print(f"Approach 1 (List Comprehension): {time01}")
    print(f"Approach 2 (Vectorized): {time02}")

输出:

Approach 1 (List Comprehension): 8.1180582
Approach 2 (Vectorized): 0.9656997
于 2021-09-12T22:09:42.693 回答