54

我有一个形状为 (6,2) 的 Numpy 数组:

[[ 0, 1],
 [10,11],
 [20,21],
 [30,31],
 [40,41],
 [50,51]]

我需要一个具有步长1和窗口大小的滑动窗口,3如下所示:

[[ 0, 1,10,11,20,21],
 [10,11,20,21,30,31],
 [20,21,30,31,40,41],
 [30,31,40,41,50,51]]

我正在寻找一个 Numpy 解决方案。如果您的解决方案可以参数化原始数组的形状以及窗口大小和步长,那就太好了。


我找到了这个相关的答案Using strides for an Effective Moving Average Filter但我看不到如何在那里指定步长以及如何将窗口从 3d 折叠到连续的 2d 数组。还有这个滚动或滑动窗口迭代器?但那是在 Python 中,我不确定它的效率如何。此外,它支持元素,但如果每个元素都有多个特征,则最终不会将它们连接在一起。

4

8 回答 8

74

您可以使用精美的索引在 numpy 中创建矢量化滑动窗口。

>>> import numpy as np

>>> a = np.array([[00,01], [10,11], [20,21], [30,31], [40,41], [50,51]])

>>> a
array([[ 0,  1],
       [10, 11],
       [20, 21],                      #define our 2d numpy array
       [30, 31],
       [40, 41],
       [50, 51]])

>>> a = a.flatten()

>>> a
array([ 0,  1, 10, 11, 20, 21, 30, 31, 40, 41, 50, 51])    #flattened numpy array

>>> indexer = np.arange(6)[None, :] + 2*np.arange(4)[:, None]

>>> indexer
array([[ 0,  1,  2,  3,  4,  5],
       [ 2,  3,  4,  5,  6,  7],            #sliding window indices
       [ 4,  5,  6,  7,  8,  9],
       [ 6,  7,  8,  9, 10, 11]])

>>> a[indexer]
array([[ 0,  1, 10, 11, 20, 21],
       [10, 11, 20, 21, 30, 31],            #values of a over sliding window
       [20, 21, 30, 31, 40, 41],
       [30, 31, 40, 41, 50, 51]])

>>> np.sum(a[indexer], axis=1)
array([ 63, 123, 183, 243])         #sum of values in 'a' under the sliding window.

解释这段代码在做什么。

np.arange(6)[None, :]创建一个 0 到 6 的行向量,并创建np.arange(4)[:, None]一个 0 到 4 的列向量。这会产生一个 4x6 矩阵,其中每行(其中 6 个)代表一个窗口,行数(其中 4 个)代表视窗。2 的倍数使滑动窗口一次滑动 2 个单位,这是在每个元组上滑动所必需的。使用 numpy 数组切片,您可以将滑动窗口传递到展平的 numpy 数组中,并像 sum 一样对它们进行聚合。

于 2017-02-15T19:18:41.400 回答
38
In [1]: import numpy as np

In [2]: a = np.array([[00,01], [10,11], [20,21], [30,31], [40,41], [50,51]])

In [3]: w = np.hstack((a[:-2],a[1:-1],a[2:]))

In [4]: w
Out[4]: 
array([[ 0,  1, 10, 11, 20, 21],
       [10, 11, 20, 21, 30, 31],
       [20, 21, 30, 31, 40, 41],
       [30, 31, 40, 41, 50, 51]])

你可以把它写成一个函数:

def window_stack(a, stepsize=1, width=3):
    n = a.shape[0]
    return np.hstack( a[i:1+n+i-width:stepsize] for i in range(0,width) )

这并不真正取决于原始数组的形状,只要a.ndim = 2. 请注意,我从不在交互式版本中使用任何一种长度。形状的第二维无关紧要;每一行可以任意长。感谢@Jaime 的建议,您可以在不检查形状的情况下做到这一点:

def window_stack(a, stepsize=1, width=3):
    return np.hstack( a[i:1+i-width or None:stepsize] for i in range(0,width) )
于 2013-03-30T19:22:31.720 回答
33

一种解决方案是

np.lib.stride_tricks.as_strided(a, shape=(4,6), strides=(8,4)).

当您开始考虑指针/地址时,使用 strides 是很直观的。

as_strided()方法有 3 个参数。

  1. 数据
  2. 形状
  3. 大步前进

data是我们要操作的数组。

as_strided()用于实现滑动窗口函数,我们必须事先计算输出的形状。在问题中,(4,6) 是输出的形状。如果尺寸不正确,我们最终会读取垃圾值。这是因为我们通过将指针移动几个字节(取决于数据类型)来访问数据。

确定正确的值strides对于获得预期结果至关重要。在计算步幅之前,使用 找出每个元素占用的内存arr.strides[-1]。在本例中,一个元素占用的内存为 4 个字节。Numpy 数组以行主要方式创建。下一行的第一个元素紧邻当前行的最后一个元素。

前任:

0 , 1 | 10, 11 | ...

10 就在 1 旁边。

想象一下将 2D 数组重新整形为 1D(这是可以接受的,因为数据以行主要格式存储)。输出中每一行的第一个元素是一维数组中的奇数索引元素。

0, 10, 20, 30, ..

因此,从 0 到 10、10 到 20 等等,我们需要在内存中采取的步数是2 * mem size of element。每行的步幅为2 * 4bytes = 8. 对于输出中的给定行,所有元素在我们想象的一维数组中彼此相邻。要获取一行中的下一个元素,只需迈出一个等于元素大小的步幅。列步幅的值为 4 个字节。

所以,strides=(8,4)

另一种解释:输出的形状为 (4,6)。列步幅4。因此,第一行元素从索引开始0,有 6 个元素,每个元素间隔 4 个字节。收集完第一行后,第二行开始距离当前行开头 8 个字节。第三行开始距离第二行起点 8 个字节,依此类推。

形状决定了我们需要的行数和列数。strides 定义了开始行和收集列元素的内存步骤

于 2017-09-13T13:35:13.210 回答
9

使用1可以进行简短的列表理解:more_itertools.windowed

给定

import numpy as np
import more_itertools as mit


a = [["00","01"],
     ["10","11"],
     ["20","21"],
     ["30","31"],
     ["40","41"],
     ["50","51"]]

b = np.array(a)

代码

np.array([list(mit.flatten(w)) for w in mit.windowed(a, n=3)])

或者

np.array([[i for item in w for i in item] for w in mit.windowed(a, n=3)])

或者

np.array(list(mit.windowed(b.ravel(), n=6)))

输出

array([['00', '01', '10', '11', '20', '21'],
       ['10', '11', '20', '21', '30', '31'],
       ['20', '21', '30', '31', '40', '41'],
       ['30', '31', '40', '41', '50', '51']], 
      dtype='<U2')

n=3创建并展平大小的滑动窗口。请注意,默认步长为more_itertools.windowed(..., step=1).


表现

作为一个数组,接受的答案是最快的。

%timeit np.hstack((a[:-2], a[1:-1], a[2:]))
# 37.5 µs ± 1.88 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.hstack((b[:-2], b[1:-1], b[2:]))
# 12.9 µs ± 166 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit np.array([list(mit.flatten(w)) for w in mit.windowed(a, n=3)])
# 23.2 µs ± 1.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.array([[i for item in w for i in item] for w in mit.windowed(a, n=3)])
# 21.2 µs ± 999 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.array(list(mit.windowed(b.ravel(), n=6)))
# 43.4 µs ± 374 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

实现itertool 配方和许多有用工具的第三方库。

于 2017-10-12T00:44:56.027 回答
8

从 开始Numpy 1.20,使用 newsliding_window_view滑动/滚动元素窗口,并基于与user42541 的答案相同的想法,我们可以这样做:

import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]])
sliding_window_view(values.flatten(), window_shape = 2*3)[::2]
# array([[ 0,  1, 10, 11, 20, 21],
#        [10, 11, 20, 21, 30, 31],
#        [20, 21, 30, 31, 40, 41],
#        [30, 31, 40, 41, 50, 51]])

其中2是子数组和3窗口的大小。


中间步骤的详细信息:

# values = np.array([[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]])

# Flatten the array (concatenate sub-arrays):
values.flatten()
# array([ 0,  1, 10, 11, 20, 21, 30, 31, 40, 41, 50, 51])

# Slide through windows of size 2*3=6:
sliding_window_view(values.flatten(), 2*3)
# array([[ 0,  1, 10, 11, 20, 21],
#        [ 1, 10, 11, 20, 21, 30],
#        [10, 11, 20, 21, 30, 31],
#        [11, 20, 21, 30, 31, 40],
#        [20, 21, 30, 31, 40, 41],
#        [21, 30, 31, 40, 41, 50],
#        [30, 31, 40, 41, 50, 51]])

# Only keep even rows (1 row in 2 - if sub-arrays have a size of x, then replace 2 with x):
sliding_window_view(values.flatten(), 2*3)[::2]
# array([[ 0,  1, 10, 11, 20, 21],
#        [10, 11, 20, 21, 30, 31],
#        [20, 21, 30, 31, 40, 41],
#        [30, 31, 40, 41, 50, 51]])
于 2020-12-25T10:24:10.460 回答
3

从 NumPy 版本开始,1.20.0这可以使用

np.lib.stride_tricks.sliding_window_view(arr, winsize)

例子:

>>> arr = np.arange(0, 9).reshape((3, 3))
>>> np.lib.stride_tricks.sliding_window_view(arr, (2, 2))

array([[[[0, 1],
         [3, 4]],

        [[1, 2],
         [4, 5]]],


       [[[3, 4],
         [6, 7]],

        [[4, 5],
         [7, 8]]]])

你可以在这里阅读更多关于它的信息。

于 2021-01-27T16:07:17.757 回答
2

这是使用 Numpy >= v1.17 的单线

rowsJoined = 3

splits = np.vstack(np.split(x,np.array([[i, i + rowsJoined] for i in range(x.shape[0] - (rowsJoined - 1))]).reshape(-1))).reshape(-1, rowsJoined * x.shape[1]) 

测试

x = np.array([[00,1],
              [10,11],
              [20,21],
              [30,31],
              [40,41],
              [50,51]])

结果

[[ 0  1 10 11 20 21]
 [10 11 20 21 30 31]
 [20 21 30 31 40 41]
 [30 31 40 41 50 51]]

大型阵列上的测试性能

import numpy as np
import time

x = np.array(range(1000)).reshape(-1, 2)
rowsJoined = 3

all_t = 0.
for i in range(1000):
    start_ = time.time()
    np.vstack(
        numpy.split(x,np.array([[i, i + rowsJoined] for i in range(x.shape[0] - (rowsJoined - 1))])
                    .reshape(-1))).reshape(-1, rowsJoined * x.shape[1])
    all_t += time.time() - start_

print('Average Time of 1000 Iterations on Array of Shape '
      '1000 x 2 is: {} Seconds.'.format(all_t/1000.))

表现结果

Average Time of 1000 Iterations on Array of Shape 1000 x 2 is: 0.0016909 Seconds.
于 2019-10-03T10:48:22.680 回答
0

这是一个纯 Python 实现:

def sliding_window(arr, window=3):
    i = iter(arr)
    a = []
    for e in range(0, window): a.append(next(i))
    yield a
    for e in i:
        a = a[1:] + [e]
        yield a

一个例子:

# flatten array
flatten = lambda l: [item for sublist in l for item in sublist]

a = [[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]]
w = sliding_window(a, width=3)
print( list(map(flatten,w)) )

[[0, 1, 10, 11, 20, 21], [10, 11, 20, 21, 30, 31], [20, 21, 30, 31, 40, 41], [30, 31, 40, 41, 50, 51]]

基准

import timeit
def benchmark():
  a = [[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]]
  sliding_window(a, width=3)

times = timeit.Timer(benchmark).repeat(3, number=1000)
time_taken = min(times) / 1000
print(time_taken)

1.0944640007437556e-06
于 2019-05-17T13:22:31.963 回答