25

我想为 numpy 实现itertools.combinations。基于这个讨论,我有一个适用于一维输入的函数:

def combs(a, r):
    """
    Return successive r-length combinations of elements in the array a.
    Should produce the same output as array(list(combinations(a, r))), but 
    faster.
    """
    a = asarray(a)
    dt = dtype([('', a.dtype)]*r)
    b = fromiter(combinations(a, r), dt)
    return b.view(a.dtype).reshape(-1, r)

并且输出是有意义的:

In [1]: list(combinations([1,2,3], 2))
Out[1]: [(1, 2), (1, 3), (2, 3)]

In [2]: array(list(combinations([1,2,3], 2)))
Out[2]: 
array([[1, 2],
       [1, 3],
       [2, 3]])

In [3]: combs([1,2,3], 2)
Out[3]: 
array([[1, 2],
       [1, 3],
       [2, 3]])

但是,如果我可以将其扩展到 ND 输入,那将是最好的,其中额外的维度只是让您可以一次快速地进行多个调用。因此,从概念上讲,如果combs([1, 2, 3], 2)产生[1, 2], [1, 3], [2, 3]combs([4, 5, 6], 2)产生[4, 5], [4, 6], [5, 6],那么combs((1,2,3) and (4,5,6), 2)应该产生[1, 2], [1, 3], [2, 3] and [4, 5], [4, 6], [5, 6]其中“和”只代表平行行或列(以有意义的为准)。(同样适用于其他尺寸)

我不确定:

  1. 如何使维度以与其他函数的工作方式一致的逻辑方式工作(比如一些 numpy 函数如何具有axis=参数,以及轴 0 的默认值。所以可能轴 0 应该是我正在组合的轴,以及所有其他轴只代表并行计算?)
  2. 如何让上面的代码与 ND 一起工作(现在我得到了ValueError: setting an array element with a sequence.
  3. 有更好的方法dt = dtype([('', a.dtype)]*r)吗?
4

4 回答 4

23

您可以使用itertools.combinations()创建索引数组,然后使用 NumPy 的精美索引:

import numpy as np
from itertools import combinations, chain
from scipy.special import comb

def comb_index(n, k):
    count = comb(n, k, exact=True)
    index = np.fromiter(chain.from_iterable(combinations(range(n), k)), 
                        int, count=count*k)
    return index.reshape(-1, k)

data = np.array([[1,2,3,4,5],[10,11,12,13,14]])

idx = comb_index(5, 3)
print(data[:, idx])

输出:

[[[ 1  2  3]
  [ 1  2  4]
  [ 1  2  5]
  [ 1  3  4]
  [ 1  3  5]
  [ 1  4  5]
  [ 2  3  4]
  [ 2  3  5]
  [ 2  4  5]
  [ 3  4  5]]

 [[10 11 12]
  [10 11 13]
  [10 11 14]
  [10 12 13]
  [10 12 14]
  [10 13 14]
  [11 12 13]
  [11 12 14]
  [11 13 14]
  [12 13 14]]]
于 2013-04-15T06:03:56.273 回答
8

当 时r = k = 2,您还可以使用numpy.triu_indices(n, 1)哪些索引矩阵的上三角形。

idx = comb_index(5, 2)

来自HYRY的回答相当于

idx = np.transpose(np.triu_indices(5, 1))

但是是内置的,并且 N 高于 ~20 时要快几倍:

timeit comb_index(1000, 2)
32.3 ms ± 443 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

timeit np.transpose(np.triu_indices(1000, 1))
10.2 ms ± 25.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
于 2018-03-22T15:10:36.420 回答
7

案例 k = 2:np.triu_indices

我已经k = 2使用上述功能的许多变体测试了案例perfplot。毫无疑问,赢家是,np.triu_indices我现在看到,np.dtype([('', np.intp)] * 2)即使对于像igraph.EdgeList.

from itertools import combinations, chain
from scipy.special import comb
import igraph as ig #graph library build on C
import networkx as nx #graph library, pure Python

def _combs(n):
    return np.array(list(combinations(range(n),2)))

def _combs_fromiter(n): #@Jaime
    indices = np.arange(n)
    dt = np.dtype([('', np.intp)]*2)
    indices = np.fromiter(combinations(indices, 2), dt)
    indices = indices.view(np.intp).reshape(-1, 2)
    return indices

def _combs_fromiterplus(n):
    dt = np.dtype([('', np.intp)]*2)
    indices = np.fromiter(combinations(range(n), 2), dt)
    indices = indices.view(np.intp).reshape(-1, 2)
    return indices

def _numpy(n): #@endolith
    return np.transpose(np.triu_indices(n,1))

def _igraph(n):
    return np.array(ig.Graph(n).complementer(False).get_edgelist())

def _igraph_fromiter(n):
    dt = np.dtype([('', np.intp)]*2)
    indices = np.fromiter(ig.Graph(n).complementer(False).get_edgelist(), dt)
    indices = indices.view(np.intp).reshape(-1, 2)
    return indices
    
def _nx(n):
    G = nx.Graph()
    G.add_nodes_from(range(n))
    return np.array(list(nx.complement(G).edges))

def _nx_fromiter(n):
    G = nx.Graph()
    G.add_nodes_from(range(n))
    dt = np.dtype([('', np.intp)]*2)
    indices = np.fromiter(nx.complement(G).edges, dt)
    indices = indices.view(np.intp).reshape(-1, 2)
    return indices

def _comb_index(n): #@HYRY
    count = comb(n, 2, exact=True)
    index = np.fromiter(chain.from_iterable(combinations(range(n), 2)), 
                        int, count=count*2)
    return index.reshape(-1, 2)

        
fig = plt.figure(figsize=(15, 10))
plt.grid(True, which="both")
out = perfplot.bench(
        setup = lambda x: x,
        kernels = [_numpy, _combs, _combs_fromiter, _combs_fromiterplus, 
                   _comb_index, _igraph, _igraph_fromiter, _nx, _nx_fromiter],
        n_range = [2 ** k for k in range(12)],
        xlabel = 'combinations(n, 2)',
        title = 'testing combinations',
        show_progress = False,
        equality_check = False)
out.show()

在此处输入图像描述

想知道为什么np.triu_indices不能扩展到更多维度?

情况 2 ≤ k ≤ 4:(triu_indices在此处实现)= 最高 2 倍加速

np.triu_indices 实际上可能是案例的赢家k = 3,即使k = 4我们实现了一个通用的方法。此方法的当前版本等效于:

def triu_indices(n, k):
    x = np.less.outer(np.arange(n), np.arange(-k+1, n-k+1))
    return np.nonzero(x)

它为两个序列 0,1,...,n-1 构建关系x < y的矩阵表示,并找到它们不为零的单元格的位置。对于 3D 情况,我们需要添加额外的维度和相交关系x < yy < z。对于下一维过程是相同的,但这会导致巨大的内存过载,因为需要 n^k 个二进制单元,并且其中只有 C(n, k) 达到 True 值。内存使用量和性能增长了 O(n!),因此该算法itertools.combinations仅在 k 值较小时表现出色。这最好实际用于案例k=2k=3

def C(n, k): #huge memory overload...
    if k==0:
        return np.array([])
    if k==1:
        return np.arange(1,n+1)
    elif k==2:
        return np.less.outer(np.arange(n), np.arange(n))
    else:
        x = C(n, k-1)
        X = np.repeat(x[None, :, :], len(x), axis=0)
        Y = np.repeat(x[:, :, None], len(x), axis=2)
        return X&Y

def C_indices(n, k):
    return np.transpose(np.nonzero(C(n,k)))

让我们用perfplot结帐:

import matplotlib.pyplot as plt
import numpy as np
import perfplot
from itertools import chain, combinations
from scipy.special import comb

def C(n, k):  # huge memory overload...
    if k == 0:
        return np.array([])
    if k == 1:
        return np.arange(1, n + 1)
    elif k == 2:
        return np.less.outer(np.arange(n), np.arange(n))
    else:
        x = C(n, k - 1)
        X = np.repeat(x[None, :, :], len(x), axis=0)
        Y = np.repeat(x[:, :, None], len(x), axis=2)
        return X & Y

def C_indices(data):
    n, k = data
    return np.transpose(np.nonzero(C(n, k)))

def comb_index(data):
    n, k = data
    count = comb(n, k, exact=True)
    index = np.fromiter(chain.from_iterable(combinations(range(n), k)),
                        int, count=count * k)
    return index.reshape(-1, k)

def build_args(k):
    return {'setup': lambda x: (x, k),
            'kernels': [comb_index, C_indices],
            'n_range': [2 ** x for x in range(2, {2: 10, 3:10, 4:7, 5:6}[k])],
            'xlabel': f'N',
            'title': f'test of case C(N,{k})',
            'show_progress': True,
            'equality_check': lambda x, y: np.array_equal(x, y)}

outs = [perfplot.bench(**build_args(n)) for n in (2, 3, 4, 5)]
fig = plt.figure(figsize=(20, 20))
for i in range(len(outs)):
    ax = fig.add_subplot(2, 2, i + 1)
    ax.grid(True, which="both")
    outs[i].plot()
plt.show()

在此处输入图像描述

k=2因此,对于(相当于k=3` ) 实现了最佳性能提升,np.triu_indices) and for 它几乎快了两倍。

案例 k > 3:(numpy_combinations在此处实现)= 高达 2.5 倍的加速

这个问题之后(感谢@Divakar),我设法找到了一种基于前一列和帕斯卡三角形计算特定列值的方法。它还没有尽可能地优化,但结果确实很有希望。开始了:

from scipy.linalg import pascal

def stretch(a, k):
    l = a.sum()+len(a)*(-k)
    out = np.full(l, -1, dtype=int)
    out[0] = a[0]-1
    idx = (a-k).cumsum()[:-1]
    out[idx] = a[1:]-1-k
    return out.cumsum()

def numpy_combinations(n, k):
    #n, k = data #benchmark version
    n, k = data
    x = np.array([n])
    P = pascal(n).astype(int)
    C = []
    for b in range(k-1,-1,-1):
        x = stretch(x, b)
        r = P[b][x - b]
        C.append(np.repeat(x, r))
    return n - 1 - np.array(C).T

基准测试结果是:

# script is the same as in previous example except this part
def build_args(k):
return {'setup': lambda x: (k, x),
        'kernels': [comb_index, numpy_combinations],
        'n_range': [x for x in range(1, k)],
        'xlabel': f'N',
        'title': f'test of case C({k}, k)',
        'show_progress': True,
        'equality_check': False}
outs = [perfplot.bench(**build_args(n)) for n in (12, 15, 17, 23, 25, 28)]
fig = plt.figure(figsize=(20, 20))
for i in range(len(outs)):
    ax = fig.add_subplot(2, 3, i + 1)
    ax.grid(True, which="both")
    outs[i].plot()
plt.show()

在此处输入图像描述

尽管它仍然无法与之抗争,itertools.combinationsn < 15在其他情况下它是一个新的赢家。最后但并非最不重要的一点是,numpy当组合数量变得非常大时,展示它的力量。它能够在处理大约 40'000'000 个大小为 14 的项目的 C(28, 14) 组合时存活下来

于 2020-09-01T20:01:25.010 回答
4

不确定它在性能方面的表现如何,但您可以在索引数组上进行组合,然后使用以下命令提取实际的数组切片np.take

def combs_nd(a, r, axis=0):
    a = np.asarray(a)
    if axis < 0:
        axis += a.ndim
    indices = np.arange(a.shape[axis])
    dt = np.dtype([('', np.intp)]*r)
    indices = np.fromiter(combinations(indices, r), dt)
    indices = indices.view(np.intp).reshape(-1, r)
    return np.take(a, indices, axis=axis)

>>> combs_nd([1,2,3], 2)
array([[1, 2],
       [1, 3],
       [2, 3]])
>>> combs_nd([[1,2,3],[4,5,6]], 2, axis=1)
array([[[1, 2],
        [1, 3],
        [2, 3]],

       [[4, 5],
        [4, 6],
        [5, 6]]])
于 2013-04-15T05:52:26.757 回答