其中一部分是根据Nelson 规则和Western Electric 规则检查数据系列。
例如(尼尔森规则中的规则 2):检查连续九个(或更多)点是否在均值的同一侧。
现在我可以通过遍历数组来简单地检查这样的规则。
- 但在我这样做之前,我在这里检查一下,如果 numpy/pandas 有办法在不迭代的情况下做到这一点?
- 无论如何:实现上述检查的“numpy-ic”方法是什么?
import numpy as np
x = np.random.rand(100)
f = np.sign(x - x.mean())
c = np.cumsum(f)
d = c[9:] - c[:-9]
print np.max(d), np.min(d)
如果 np.max(d) == 9 或 np.min(d) == -9 则连续有九个(或更多)点位于均值的同一侧。
或者您可以使用以下代码来计算每一行的长度:
np.diff(np.where(np.diff(np.r_[-2,f,-2]))[0])
给定data
和 minimum length
,您可以检查数组是否
np.diff(np.cumsum(np.sign(data - np.mean(data))), length)
包含零。
另一种可能性:使用相关或卷积
>>> a = np.random.randn(50)
>>> b = (a - a.mean()) > 0
>>> b.astype(int)
array([0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 1,
1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0,
1, 1, 1, 1])
>>> c = np.correlate(b, np.ones(3), mode='valid')
>>> c
array([ 2., 2., 1., 1., 1., 1., 0., 0., 1., 2., 3., 2., 2.,
1., 1., 0., 0., 1., 2., 3., 3., 3., 3., 3., 2., 2.,
2., 2., 2., 1., 1., 1., 1., 2., 1., 2., 2., 2., 1.,
0., 0., 1., 2., 2., 2., 2., 3., 3.])
>>> c.max() == 3
True
>>> c.min() == 0
True
它会比 HYRY cumsum 版本慢。
旁白:在 statsmodels 中有一个用于测试类似运行的运行测试
正如我在评论中提到的,您可能想尝试使用一些跨步技巧。
首先,让我们创建一个异常大小的数组:我们可以把它当作np.int8
节省一些空间
anomalies = x - x.mean()
signs = np.sign(anomalies).astype(np.int8)
现在大步前进。如果要考虑N
连续点,您将使用
from np.lib.stride_tricks import as_strided
strided = as_strided(signs,
strides=(signs.itemsize,signs.itemsize),
shape=(signs.shape,N))
这给了我们一个(x.size, N)
rollin 数组:第一行是x[0:N]
,第二行x[1:N+1]
……当然,最后N-1
一行是没有意义的,所以从现在开始我们将使用
strided = strided[:-N+1]
让我们沿行求和
consecutives = strided.sum(axis=-1)
这给了我们一个介于和(x.size-N+1)
之间的值大小的数组:我们只需要找到绝对值在哪里:-N
+N
N
(indices,) = np.nonzero(consecutives == N)
indices
是数组的索引i
数组x
,其值x[i:i+N]
位于均值的同一侧...
x=np.random.rand(10)
带有和的示例N=3
>>> x = array([ 0.57016436, 0.79360943, 0.89535982, 0.83632245, 0.31046202,
0.91398363, 0.62358298, 0.72148491, 0.99311681, 0.94852957])
>>> signs = np.sign(x-x.mean()).astype(np.int8)
array([-1, 1, 1, 1, -1, 1, -1, -1, 1, 1], dtype=int8)
>>> strided = as_strided(signs,strides=(1,1),shape=(signs.size,3))
array([[ -1, 1, 1],
[ 1, 1, 1],
[ 1, 1, -1],
[ 1, -1, 1],
[ -1, 1, -1],
[ 1, -1, -1],
[ -1, -1, 1],
[ -1, 1, 1],
[ 1, 1, -106],
[ 1, -106, -44]], dtype=int8)
>>> consecutive=strided[:-N+1].sum(axis=-1)
array([ 1, 3, 1, 1, -1, -1, -1, 1])
>>> np.nonzero(np.abs(consecutive)==N)
(array([1]),)