2010-06-17 217 views
9

我在下面包含的python中执行嵌套循环。这是搜索现有金融时间序列并寻找符合某些特征的时间序列的基本方式。在这种情况下,有两个单独的同样大小的数组表示“关闭”(即资产价格)和“交易量”(即在此期间交换的资产量)。对于每个时间段,我想期待在未来所有的间隔长度1和INTERVAL_LENGTH之间并看看是否有这些间隔有符合我搜索(在这种情况下的特性密切值的比例比1.0001少较大超过1.5并且总体积大于100)。如何加速python嵌套循环?

我的理解是,使用NumPy时加速的主要原因之一是解释器不需要在每次评估某些东西时键入检查操作数,只要您将操作数组作为整个(例如numpy_array * 2),但显然下面的代码没有利用这一点。有没有办法用某种可能导致加速的窗口函数替代内部循环,或者使用numpy/scipy以其他方式在本地python中大幅加速?

或者,有没有更好的方法来做到这一点(例如,在C++中编写此循环并使用编织会更快)?

ARRAY_LENGTH = 500000 
INTERVAL_LENGTH = 15 
close = np.array(xrange(ARRAY_LENGTH)) 
volume = np.array(xrange(ARRAY_LENGTH)) 
close, volume = close.astype('float64'), volume.astype('float64') 

results = [] 
for i in xrange(len(close) - INTERVAL_LENGTH): 
    for j in xrange(i+1, i+INTERVAL_LENGTH): 
     ret = close[j]/close[i] 
     vol = sum(volume[i+1:j+1]) 
     if ret > 1.0001 and ret < 1.5 and vol > 100: 
      results.append([i, j, ret, vol]) 
print results 
+1

您的计算看起来很简单,您为什么不使用Cython? – Tarantula 2010-06-17 21:19:15

回答

6

更新:(几乎)完全量化低于版本 “new_function2” ......

我会添加注释来解释的东西位。

它给出了〜50倍的加速比,如果你对输出是numpy数组而不是列表的输出没问题,那么可能会有更大的加速比。原因是:

In [86]: %timeit new_function2(close, volume, INTERVAL_LENGTH) 
1 loops, best of 3: 1.15 s per loop 

您可以使用np.cumsum()调用来替换内循环...见下面我的“new_function”函数。这给出了一个相当的加速...

In [61]: %timeit new_function(close, volume, INTERVAL_LENGTH) 
1 loops, best of 3: 15.7 s per loop 

VS

In [62]: %timeit old_function(close, volume, INTERVAL_LENGTH) 
1 loops, best of 3: 53.1 s per loop 

应该可以向量化整个事情,避免循环完全,但...给我一分钟,我会看到我能做什么......

import numpy as np 

ARRAY_LENGTH = 500000 
INTERVAL_LENGTH = 15 
close = np.arange(ARRAY_LENGTH, dtype=np.float) 
volume = np.arange(ARRAY_LENGTH, dtype=np.float) 

def old_function(close, volume, INTERVAL_LENGTH): 
    results = [] 
    for i in xrange(len(close) - INTERVAL_LENGTH): 
     for j in xrange(i+1, i+INTERVAL_LENGTH): 
      ret = close[j]/close[i] 
      vol = sum(volume[i+1:j+1]) 
      if (ret > 1.0001) and (ret < 1.5) and (vol > 100): 
       results.append((i, j, ret, vol)) 
    return results 


def new_function(close, volume, INTERVAL_LENGTH): 
    results = [] 
    for i in xrange(close.size - INTERVAL_LENGTH): 
     vol = volume[i+1:i+INTERVAL_LENGTH].cumsum() 
     ret = close[i+1:i+INTERVAL_LENGTH]/close[i] 

     filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100) 
     j = np.arange(i+1, i+INTERVAL_LENGTH)[filter] 

     tmp_results = zip(j.size * [i], j, ret[filter], vol[filter]) 
     results.extend(tmp_results) 
    return results 

def new_function2(close, volume, INTERVAL_LENGTH): 
    vol, ret = [], [] 
    I, J = [], [] 
    for k in xrange(1, INTERVAL_LENGTH): 
     start = k 
     end = volume.size - INTERVAL_LENGTH + k 
     vol.append(volume[start:end]) 
     ret.append(close[start:end]) 
     J.append(np.arange(start, end)) 
     I.append(np.arange(volume.size - INTERVAL_LENGTH)) 

    vol = np.vstack(vol) 
    ret = np.vstack(ret) 
    J = np.vstack(J) 
    I = np.vstack(I) 

    vol = vol.cumsum(axis=0) 
    ret = ret/close[:-INTERVAL_LENGTH] 

    filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100) 

    vol = vol[filter] 
    ret = ret[filter] 
    I = I[filter] 
    J = J[filter] 

    output = zip(I.flat,J.flat,ret.flat,vol.flat) 
    return output 

results = old_function(close, volume, INTERVAL_LENGTH) 
results2 = new_function(close, volume, INTERVAL_LENGTH) 
results3 = new_function(close, volume, INTERVAL_LENGTH) 

# Using sets to compare, as the output 
# is in a different order than the original function 
print set(results) == set(results2) 
print set(results) == set(results3) 
3

一个加速。将除去sum部,如在该实施方式中它通过INTERVAL_LENGTH总结长度为2的列表。相反,只需将volume[j+1]添加到循环最后一次迭代的vol的前一个结果中。因此,每次只需添加两个整数,而不是每次总结整个列表并对其进行切片。此外,而不是通过做sum(volume[i+1:j+1])开始,只是做vol = volume[i+1] + volume[j+1],因为你知道初始情况下,这里将永远是只有两个指标。

另一个加速将使用.extend而不是.append,因为python执行有extend运行速度显着加快。

您也可以拆分最后的if语句,以便仅在需要时才执行某些计算。例如,你知道if vol <= 100,你不需要计算ret

这并不是完全回答你的问题,但我认为特别是总和问题,你应该看到这些改变显着加速。

编辑 - 你也不需要len,因为你已经明确知道列表的长度(除非这只是例子)。将其定义为数字而不是len(something)总是更快。

编辑 - 执行(这是未经测试):

ARRAY_LENGTH = 500000 
INTERVAL_LENGTH = 15 
close = np.array(xrange(ARRAY_LENGTH)) 
volume = np.array(xrange(ARRAY_LENGTH)) 
close, volume = close.astype('float64'), volume.astype('float64') 

results = [] 
ex = results.extend 
for i in xrange(ARRAY_LENGTH - INTERVAL_LENGTH): 
    vol = volume[i+1] 
    for j in xrange(i+1, i+INTERVAL_LENGTH): 
     vol += volume[j+1] 
     if vol > 100: 
      ret = close[j]/close[i] 
      if 1.0001 < ret < 1.5: 
       ex([i, j, ret, vol]) 
print results 
+0

另一个加速将会定义'extend_results = results.extend'一次(在循环之前),然后在循环内部使用'extend_results([i,j,ret,vol])'来避免查找,但总是测量(用' timeit'模块) – ChristopheD 2010-06-17 21:34:45

+0

有趣!查找时间有多大,一般?通常是一种有用的加速,还是因为这个特定循环的幅度更大? – nearlymonolith 2010-06-17 21:43:01

+2

@Anthony Morelli:局部变量查找很多因为'编译器'优化了函数体,因此局部变量不需要字典查找,所以全局或内置变量查找耗时更少(另请参见:http://www.python.org/doc/essays/list2str.html) 。但是一般来说基准测试总是必要的,因为(不成功的)范围查找时间受到需要考虑的项目大小的影响等等。但是,对于这样大的循环来说,考虑这种技术是可行的(未测试) ,但它至少应该在一定程度上更快)。 – ChristopheD 2010-06-17 22:09:58

1

你为什么不尝试生成的结果作为一个单独的列表,像(不是追加或延长快得多):

results = [ t for t in ((i, j, close[j]/close[i], sum(volume[i+1:j+1])) 
         for i in xrange(len(close)-INT_LEN) 
          for j in xrange(i+1, i+INT_LEN) 
         ) 
      if t[3] > 100 and 1.0001 < t[2] < 1.5 
      ]