我有一个像这样的numpy数组:[1 2 2 0 0 1 3 5]
是否可以将元素的索引作为二维数组获取?例如,上述输入的答案是[[3 4], [0 5], [1 2], [6], [], [7]]
目前我必须循环不同的值并调用numpy.where(input == i)
每个值,这在输入足够大的情况下性能很差。
我有一个像这样的numpy数组:[1 2 2 0 0 1 3 5]
是否可以将元素的索引作为二维数组获取?例如,上述输入的答案是[[3 4], [0 5], [1 2], [6], [], [7]]
目前我必须循环不同的值并调用numpy.where(input == i)
每个值,这在输入足够大的情况下性能很差。
这是使用 O(max(x)+len(x)) 的方法scipy.sparse
:
import numpy as np
from scipy import sparse
x = np.array("1 2 2 0 0 1 3 5".split(),int)
x
# array([1, 2, 2, 0, 0, 1, 3, 5])
M,N = x.max()+1,x.size
sparse.csc_matrix((x,x,np.arange(N+1)),(M,N)).tolil().rows.tolist()
# [[3, 4], [0, 5], [1, 2], [6], [], [7]]
这是通过创建一个稀疏矩阵来实现的,该矩阵的条目位于 (x[0],0), (x[1],1), ... 使用CSC
(压缩稀疏列)格式,这相当简单。然后将矩阵转换为LIL
(链表)格式。这种格式将每行的列索引作为列表存储在其rows
属性中,因此我们需要做的就是将其转换为列表。
请注意,对于基于小型阵列argsort
的解决方案可能更快,但在某些不是非常大的尺寸下,这将跨越。
编辑:
argsort
基于numpy
-only 的解决方案:
np.split(x.argsort(kind="stable"),np.bincount(x)[:-1].cumsum())
# [array([3, 4]), array([0, 5]), array([1, 2]), array([6]), array([], dtype=int64), array([7])]
如果组内索引的顺序无关紧要,您也可以尝试argpartition
(在这个小例子中恰好没有区别,但通常不能保证):
bb = np.bincount(x)[:-1].cumsum()
np.split(x.argpartition(bb),bb)
# [array([3, 4]), array([0, 5]), array([1, 2]), array([6]), array([], dtype=int64), array([7])]
编辑:
@Divakar 建议不要使用np.split
. 相反,循环可能更快:
A = x.argsort(kind="stable")
B = np.bincount(x+1).cumsum()
[A[B[i-1]:B[i]] for i in range(1,len(B))]
或者您可以使用全新的 (Python3.8+) 海象运算符:
A = x.argsort(kind="stable")
B = np.bincount(x)
L = 0
[A[L:(L:=L+b)] for b in B.tolist()]
编辑(已编辑):
(不是纯 numpy):作为 numba 的替代品(参见@senderle 的帖子),我们也可以使用 pythran。
编译pythran -O3 <filename.py>
import numpy as np
#pythran export sort_to_bins(int[:],int)
def sort_to_bins(idx, mx):
if mx==-1:
mx = idx.max() + 1
cnts = np.zeros(mx + 2, int)
for i in range(idx.size):
cnts[idx[i] + 2] += 1
for i in range(3, cnts.size):
cnts[i] += cnts[i-1]
res = np.empty_like(idx)
for i in range(idx.size):
res[cnts[idx[i]+1]] = i
cnts[idx[i]+1] += 1
return [res[cnts[i]:cnts[i+1]] for i in range(mx)]
在性能方面,这里numba
获胜:
repeat(lambda:enum_bins_numba_buffer(x),number=10)
# [0.6235917090671137, 0.6071486569708213, 0.6096088469494134]
repeat(lambda:sort_to_bins(x,-1),number=10)
# [0.6235359431011602, 0.6264424560358748, 0.6217901279451326]
较旧的东西:
import numpy as np
#pythran export bincollect(int[:])
def bincollect(a):
o = [[] for _ in range(a.max()+1)]
for i,j in enumerate(a):
o[j].append(i)
return o
Timings vs. numba(旧)
timeit(lambda:bincollect(x),number=10)
# 3.5732191529823467
timeit(lambda:enumerate_bins(x),number=10)
# 6.7462647299980745
根据您的数据大小,一种可能的选择是退出numpy
并使用collections.defaultdict
:
In [248]: from collections import defaultdict
In [249]: d = defaultdict(list)
In [250]: l = np.random.randint(0, 100, 100000)
In [251]: %%timeit
...: for k, v in enumerate(l):
...: d[v].append(k)
...:
10 loops, best of 3: 22.8 ms per loop
然后你最终得到一个字典{value1: [index1, index2, ...], value2: [index3, index4, ...]}
。时间缩放与数组的大小非常接近线性,因此 10,000,000 在我的机器上需要大约 2.7 秒,这似乎很合理。
虽然请求的是numpy
解决方案,但我决定看看是否有numba
基于有趣的解决方案。确实有!这是一种将分区列表表示为存储在单个预分配缓冲区中的不规则数组的方法。这从Paul Panzerargsort
提出的方法中获得了一些灵感。(对于表现不佳但更简单的旧版本,请参见下文。)
@numba.jit(numba.void(numba.int64[:],
numba.int64[:],
numba.int64[:]),
nopython=True)
def enum_bins_numba_buffer_inner(ints, bins, starts):
for x in range(len(ints)):
i = ints[x]
bins[starts[i]] = x
starts[i] += 1
@numba.jit(nopython=False) # Not 100% sure this does anything...
def enum_bins_numba_buffer(ints):
ends = np.bincount(ints).cumsum()
starts = np.empty(ends.shape, dtype=np.int64)
starts[1:] = ends[:-1]
starts[0] = 0
bins = np.empty(ints.shape, dtype=np.int64)
enum_bins_numba_buffer_inner(ints, bins, starts)
starts[1:] = ends[:-1]
starts[0] = 0
return [bins[s:e] for s, e in zip(starts, ends)]
这在 75 毫秒内处理了 1000 万个项目列表,这比用纯 Python 编写的基于列表的版本快了近 50 倍。
对于速度较慢但可读性更强的版本,这是我之前的版本,基于最近添加的对动态大小的“类型列表”的实验性支持,这使我们能够更快地以无序的方式填充每个 bin。
这有点与 's 类型推理引擎搏斗numba
,我相信有更好的方法来处理这部分。事实证明,这也比上述速度慢了近 10 倍。
@numba.jit(nopython=True)
def enum_bins_numba(ints):
bins = numba.typed.List()
for i in range(ints.max() + 1):
inner = numba.typed.List()
inner.append(0) # An awkward way of forcing type inference.
inner.pop()
bins.append(inner)
for x, i in enumerate(ints):
bins[i].append(x)
return bins
我对这些进行了以下测试:
def enum_bins_dict(ints):
enum_bins = defaultdict(list)
for k, v in enumerate(ints):
enum_bins[v].append(k)
return enum_bins
def enum_bins_list(ints):
enum_bins = [[] for i in range(ints.max() + 1)]
for x, i in enumerate(ints):
enum_bins[i].append(x)
return enum_bins
def enum_bins_sparse(ints):
M, N = ints.max() + 1, ints.size
return sparse.csc_matrix((ints, ints, np.arange(N + 1)),
(M, N)).tolil().rows.tolist()
我还针对类似于enum_bins_numba_buffer
(下面详细描述)的预编译 cython 版本对它们进行了测试。
在一千万个随机整数 ( ints = np.random.randint(0, 100, 10000000)
) 的列表中,我得到以下结果:
enum_bins_dict(ints)
3.71 s ± 80.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
enum_bins_list(ints)
3.28 s ± 52.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
enum_bins_sparse(ints)
1.02 s ± 34.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
enum_bins_numba(ints)
693 ms ± 5.81 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
enum_bins_cython(ints)
82.3 ms ± 1.77 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
enum_bins_numba_buffer(ints)
77.4 ms ± 2.06 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
令人印象深刻的是,即使关闭了边界检查,这种使用方式也numba
优于相同功能的版本。cython
我还没有足够的熟悉pythran
来使用它来测试这种方法,但我有兴趣看到一个比较。似乎基于这种加速,pythran
使用这种方法的版本也可能会快很多。
这是cython
供参考的版本,带有一些构建说明。安装后,您cython
将需要一个setup.py
像这样的简单文件:
from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize
import numpy
ext_modules = [
Extension(
'enum_bins_cython',
['enum_bins_cython.pyx'],
)
]
setup(
ext_modules=cythonize(ext_modules),
include_dirs=[numpy.get_include()]
)
和 cython 模块,enum_bins_cython.pyx
:
# cython: language_level=3
import cython
import numpy
cimport numpy
@cython.boundscheck(False)
@cython.cdivision(True)
@cython.wraparound(False)
cdef void enum_bins_inner(long[:] ints, long[:] bins, long[:] starts) nogil:
cdef long i, x
for x in range(len(ints)):
i = ints[x]
bins[starts[i]] = x
starts[i] = starts[i] + 1
def enum_bins_cython(ints):
assert (ints >= 0).all()
# There might be a way to avoid storing two offset arrays and
# save memory, but `enum_bins_inner` modifies the input, and
# having separate lists of starts and ends is convenient for
# the final partition stage.
ends = numpy.bincount(ints).cumsum()
starts = numpy.empty(ends.shape, dtype=numpy.int64)
starts[1:] = ends[:-1]
starts[0] = 0
bins = numpy.empty(ints.shape, dtype=numpy.int64)
enum_bins_inner(ints, bins, starts)
starts[1:] = ends[:-1]
starts[0] = 0
return [bins[s:e] for s, e in zip(starts, ends)]
使用工作目录中的这两个文件,运行以下命令:
python setup.py build_ext --inplace
然后,您可以使用from enum_bins_cython import enum_bins_cython
.
这是一种非常非常奇怪的方法来做到这一点,这很糟糕,但我觉得不分享太有趣了——还有numpy
!
out = np.array([''] * (x.max() + 1), dtype = object)
np.add.at(out, x, ["{} ".format(i) for i in range(x.size)])
[[int(i) for i in o.split()] for o in out]
Out[]:
[[3, 4], [0, 5], [1, 2], [6], [], [7]]
编辑:这是我在这条路上能找到的最好的方法。它仍然比 @PaulPanzer 的argsort
解决方案慢 10 倍:
out = np.empty((x.max() + 1), dtype = object)
out[:] = [[]] * (x.max() + 1)
coords = np.empty(x.size, dtype = object)
coords[:] = [[i] for i in range(x.size)]
np.add.at(out, x, coords)
list(out)
你可以通过制作一个数字字典来做到这一点,键是数字,值应该是数字看到的索引,这是最快的方法之一,你可以看到下面的代码:
>>> import numpy as np
>>> a = np.array([1 ,2 ,2 ,0 ,0 ,1 ,3, 5])
>>> b = {}
# Creating an empty list for the numbers that exist in array a
>>> for i in range(np.min(a),np.max(a)+1):
b[str(i)] = []
# Adding indices to the corresponding key
>>> for i in range(len(a)):
b[str(a[i])].append(i)
# Resulting Dictionary
>>> b
{'0': [3, 4], '1': [0, 5], '2': [1, 2], '3': [6], '4': [], '5': [7]}
# Printing the result in the way you wanted.
>>> for i in sorted (b.keys()) :
print(b[i], end = " ")
[3, 4] [0, 5] [1, 2] [6] [] [7]
伪代码:
通过从最大值中减去 numpy 数组的最小值然后加一来获得“二维数组中的一维数组的数量”。在您的情况下,它将是 5-0+1 = 6
用其中的一维数组的数量初始化一个二维数组。在您的情况下,初始化一个包含 6 个 1d 数组的 2d 数组。每个 1d 数组对应于 numpy 数组中的一个唯一元素,例如,第一个 1d 数组将对应于“0”,第二个 1d 数组将对应于“1”,...
循环遍历您的 numpy 数组,将元素的索引放入正确的相应一维数组中。在您的情况下,numpy 数组中第一个元素的索引将被放入第二个 1d 数组,numpy 数组中第二个元素的索引将被放入第三个 1d 数组,...
此伪代码将需要线性时间来运行,因为它取决于您的 numpy 数组的长度。
这为您提供了您想要的东西,并且在我的机器上 10,000,000 大约需要 2.5 秒:
import numpy as np
import timeit
# x = np.array("1 2 2 0 0 1 3 5".split(),int)
x = np.random.randint(0, 100, 100000)
def create_index_list(x):
d = {}
max_value = -1
for i,v in enumerate(x):
if v > max_value:
max_value = v
try:
d[v].append(i)
except:
d[v] = [i]
result_list = []
for i in range(max_value+1):
if i in d:
result_list.append(d[i])
else:
result_list.append([])
return result_list
# print(create_index_list(x))
print(timeit.timeit(stmt='create_index_list(x)', number=1, globals=globals()))
因此,给定一个元素列表,您想要制作 (element, index) 对。在线性时间内,这可以这样完成:
hashtable = dict()
for idx, val in enumerate(mylist):
if val not in hashtable.keys():
hashtable[val] = list()
hashtable[val].append(idx)
newlist = sorted(hashtable.values())
这应该花费 O(n) 时间。到目前为止,我想不出更快的解决方案,但如果我这样做了,我会在这里更新。