2010-04-07 91 views
7

I recently asked about trying to optimise a Python loop for a scientific application,并且收到我的an excellent, smart way of recoding it within NumPy which reduced execution time by a factor of around 100在纯NumPy中重写for循环以减少执行时间

但是,B值的计算实际上嵌套在其他几个循环中,因为它是在常规的位置网格中进行计算的。是否有类似智能的NumPy重写来缩短这个过程的时间?

我怀疑这个部分的性能增益不会很明显,并且其缺点大概是不可能向用户报告计算进度,结果不能写入输出文件直到计算结束,并且可能在一个巨大的步骤中这样做会产生内存影响?是否有可能绕过这些?你可以做

import numpy as np 
import time 

def reshape_vector(v): 
    b = np.empty((3,1)) 
    for i in range(3): 
     b[i][0] = v[i] 
    return b 

def unit_vectors(r): 
    return r/np.sqrt((r*r).sum(0)) 

def calculate_dipole(mu, r_i, mom_i): 
    relative = mu - r_i 
    r_unit = unit_vectors(relative) 
    A = 1e-7 

    num = A*(3*np.sum(mom_i*r_unit, 0)*r_unit - mom_i) 
    den = np.sqrt(np.sum(relative*relative, 0))**3 
    B = np.sum(num/den, 1) 
    return B 

N = 20000 # number of dipoles 
r_i = np.random.random((3,N)) # positions of dipoles 
mom_i = np.random.random((3,N)) # moments of dipoles 
a = np.random.random((3,3)) # three basis vectors for this crystal 
n = [10,10,10] # points at which to evaluate sum 
gamma_mu = 135.5 # a constant 

t_start = time.clock() 
for i in range(n[0]): 
    r_frac_x = np.float(i)/np.float(n[0]) 
    r_test_x = r_frac_x * a[0] 
    for j in range(n[1]): 
     r_frac_y = np.float(j)/np.float(n[1]) 
     r_test_y = r_frac_y * a[1] 
     for k in range(n[2]): 
      r_frac_z = np.float(k)/np.float(n[2]) 
      r_test = r_test_x +r_test_y + r_frac_z * a[2] 
      r_test_fast = reshape_vector(r_test) 
      B = calculate_dipole(r_test_fast, r_i, mom_i) 
      omega = gamma_mu*np.sqrt(np.dot(B,B)) 
      # write r_test, B and omega to a file 
    frac_done = np.float(i+1)/(n[0]+1) 
    t_elapsed = (time.clock()-t_start) 
    t_remain = (1-frac_done)*t_elapsed/frac_done 
    print frac_done*100,'% done in',t_elapsed/60.,'minutes...approximately',t_remain/60.,'minutes remaining' 

回答

2

一个明显的事情是与替代线

r_test_fast = reshape_vector(r_test) 

r_test_fast = r_test.reshape((3,1)) 

可能不会做出任何性能上的很大差异,但在任何情况下,它是有道理的使用numpy builtin而不是重新发明轮子。

一般来说,正如您现在可能已经注意到的那样,优化numpy的技巧是使用numpy全数组操作来表示算法,或者至少使用切片而不是遍历Python代码中的每个元素。趋于防止这种“矢量化”的是所谓的循环携带依赖性,即循环,其中每次迭代取决于先前迭代的结果。简单地看一下你的代码,你就没有这种东西,应该可以将代码向量化。

编辑:一种解决方案

我还没有证实这是正确的,但应该给你如何对待它的想法。

首先,取cartesian() function, which we'll use。然后

 

def calculate_dipole_vect(mus, r_i, mom_i): 
    # Treat each mu sequentially 
    Bs = [] 
    omega = [] 
    for mu in mus: 
     rel = mu - r_i 
     r_norm = np.sqrt((rel * rel).sum(1)) 
     r_unit = rel/r_norm[:, np.newaxis] 
     A = 1e-7 

     num = A*(3*np.sum(mom_i * r_unit, 0)*r_unit - mom_i) 
     den = r_norm ** 3 
     B = np.sum(num/den[:, np.newaxis], 0) 
     Bs.append(B) 
     omega.append(gamma_mu * np.sqrt(np.dot(B, B))) 
    return Bs, omega 


# Transpose to get more "natural" ordering with row-major numpy 
r_i = r_i.T 
mom_i = mom_i.T 

t_start = time.clock() 
r_frac = cartesian((np.arange(n[0])/float(n[0]), 
        np.arange(n[1])/float(n[1]), 
        np.arange(n[2])/float(n[2]))) 
r_test = np.dot(r_frac, a) 
B, omega = calculate_dipole_vect(r_test, r_i, mom_i) 

print 'Total time for vectorized: %f s' % (time.clock() - t_start) 
 

那么,在我的测试中,这实际上比我开始的基于循环的方法稍慢。问题是,在问题的原始版本中,它已经通过整形阵列(20000,3)的全数组操作进行了矢量化,因此任何进一步的矢量化都没有带来太多的好处。事实上,如上所述,这可能会使性能恶化,这可能是由于大型临时阵列造成的。

+0

我认为贾斯汀对配置文件的建议可能是明智的,但非常感谢......虽然我不确定我会使用它,但我认为试图理解这个例子可能是一种非常好的学习方式。 :) – Statto 2010-04-07 16:10:52

2

如果你的代码是profile,你会发现99%的运行时间在calculate_dipole之内,所以减少这个循环的时间确实不会显着减少执行时间。如果你想让这个更快,你仍然需要关注calculate_dipole。我在这方面尝试了我的Cython代码calculate_dipole,并在总体时间内减少了大约2倍。也可能有其他方法来改进Cython代码。