10

我在下面包含的python中执行嵌套循环。这是搜索现有金融时间序列并在时间序列中查找与某些特征匹配的时段的基本方法。

在这种情况下,有两个独立的、大小相同的数组表示“收盘价”(即资产的价格)和“交易量”(即在此期间交换的资产数量)。对于每个时间段,我希望查看长度在 1 之间的所有未来间隔,INTERVAL_LENGTH并查看这些间隔中的任何一个是否具有与我的搜索匹配的特征(在这种情况下,收盘值的比率大于 1.0001 并且小于1.5 且总体积大于 100)。

我的理解是,使用 NumPy 时加速的主要原因之一是,只要您在整个数组上进行操作,解释器就不需要在每次评估某些内容时对操作数进行类型检查(例如numpy_array * 2) ,但显然下面的代码没有利用这一点。

有没有办法用某种可能导致加速的窗口函数替换内部循环,或者使用numpy/scipy以在本机 python 中大幅加速它的任何其他方式?

或者,一般来说有没有更好的方法来做到这一点(例如,用 C++ 编写这个循环并使用 weave 会快得多)?

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
for i in xrange(len(close) - INTERVAL_LENGTH):
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        ret = close[j] / close[i]
        vol = sum( volume[i+1:j+1] )
        if ret > 1.0001 and ret < 1.5 and vol > 100:
            results.append( [i, j, ret, vol] )
print results
4

3 回答 3

7

更新:(几乎)在“new_function2”下面的完全矢量化版本......

我将添加评论以稍微解释一下。

它提供了约 50 倍的加速,如果您可以接受输出是 numpy 数组而不是列表,则可以实现更大的加速。原样:

In [86]: %timeit new_function2(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 1.15 s per loop

您可以通过调用 np.cumsum() 来替换您的内部循环...请参阅下面的“new_function”函数。这提供了相当大的加速...

In [61]: %timeit new_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 15.7 s per loop

对比

In [62]: %timeit old_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 53.1 s per loop

应该可以对整个事物进行矢量化并完全避免 for 循环,不过……给我一分钟,我会看看我能做什么……

import numpy as np

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.arange(ARRAY_LENGTH, dtype=np.float)
volume = np.arange(ARRAY_LENGTH, dtype=np.float)

def old_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(len(close) - INTERVAL_LENGTH):
        for j in xrange(i+1, i+INTERVAL_LENGTH):
            ret = close[j] / close[i]
            vol = sum( volume[i+1:j+1] )
            if (ret > 1.0001) and (ret < 1.5) and (vol > 100):
                results.append( (i, j, ret, vol) )
    return results


def new_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(close.size - INTERVAL_LENGTH):
        vol = volume[i+1:i+INTERVAL_LENGTH].cumsum()
        ret = close[i+1:i+INTERVAL_LENGTH] / close[i]

        filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)
        j = np.arange(i+1, i+INTERVAL_LENGTH)[filter]

        tmp_results = zip(j.size * [i], j, ret[filter], vol[filter])
        results.extend(tmp_results)
    return results

def new_function2(close, volume, INTERVAL_LENGTH):
    vol, ret = [], []
    I, J = [], []
    for k in xrange(1, INTERVAL_LENGTH):
        start = k
        end = volume.size - INTERVAL_LENGTH + k
        vol.append(volume[start:end])
        ret.append(close[start:end])
        J.append(np.arange(start, end))
        I.append(np.arange(volume.size - INTERVAL_LENGTH))

    vol = np.vstack(vol)
    ret = np.vstack(ret)
    J = np.vstack(J)
    I = np.vstack(I)

    vol = vol.cumsum(axis=0)
    ret = ret / close[:-INTERVAL_LENGTH]

    filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)

    vol = vol[filter]
    ret = ret[filter]
    I = I[filter]
    J = J[filter]

    output = zip(I.flat,J.flat,ret.flat,vol.flat)
    return output

results = old_function(close, volume, INTERVAL_LENGTH)
results2 = new_function(close, volume, INTERVAL_LENGTH)
results3 = new_function(close, volume, INTERVAL_LENGTH)

# Using sets to compare, as the output 
# is in a different order than the original function
print set(results) == set(results2)
print set(results) == set(results3)
于 2010-06-18T02:05:59.787 回答
3

一种加速方法是删除该sum部分,因为在此实现中,它对长度为 2 到 的列表求和INTERVAL_LENGTH。相反,只需volume[j+1]从循环的最后一次迭代中添加 vol 的先前结果。因此,您每次只需添加两个整数,而不是对整个列表求和并每次对其进行切片。另外,不要从做开始sum(volume[i+1:j+1]),而是做vol = volume[i+1] + volume[j+1],因为你知道这里的初始情况总是只有两个索引。

另一个加速是使用.extend而不是.append,因为 python 实现的extend运行速度明显更快。

您还可以分解最终if语句,以便仅在需要时进行某些计算。例如,您知道if vol <= 100,您不需要计算ret.

这并不能完全回答您的问题,但我认为尤其是总和问题,您应该会看到这些更改的显着加速。

编辑 - 您也不需要len,因为您已经明确知道列表的长度(除非这只是为了示例)。将其定义为数字而不是len(something)总是更快。

编辑 - 实现(这是未经测试的):

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
ex = results.extend
for i in xrange(ARRAY_LENGTH - INTERVAL_LENGTH):
    vol = volume[i+1]
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        vol += volume[j+1]
        if vol > 100:
            ret = close[j] / close[i]
            if 1.0001 < ret < 1.5:
                ex( [i, j, ret, vol] )
print results
于 2010-06-17T21:22:51.120 回答
1

为什么不尝试将结果生成为单个列表(比附加或扩展快得多),例如:

results = [ t for t in ( (i, j, close[j]/close[i], sum(volume[i+1:j+1]))
                         for i in xrange(len(close)-INT_LEN)
                             for j in xrange(i+1, i+INT_LEN)
                       )
            if t[3] > 100 and 1.0001 < t[2] < 1.5
          ]
于 2010-06-18T04:32:46.767 回答