2017-02-16 126 views
4

Cython启动器在这里。我试图通过使用多个线程来加速计算某些成对统计量(在几个分箱中)。特别是,我使用cython.parallel中的prange,它在内部使用openMP。Cython:使prange并行化线程安全

下面的最小例子说明了这个问题(通过Jupyter笔记本Cython magic编译)。

笔记本设置:

%load_ext Cython 
import numpy as np 

用Cython代码:

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a 

from cython cimport boundscheck 
import numpy as np 
from cython.parallel cimport prange, parallel 

@boundscheck(False) 
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads): 

    cdef: 
     int N = X.shape[0] 
     int nbins = bins.shape[0] 
     double Xij,Yij 
     double[:] Z = np.zeros(nbins,dtype=np.float64) 
     int i,j,b 

    with nogil, parallel(num_threads=num_threads): 
     for i in prange(N,schedule='static',chunksize=1): 
      for j in range(i): 
       #some pairwise quantities 
       Xij = X[i]-X[j] 
       Yij = 0.5*(X[i]+X[j]) 
       #check if in bin 
       for b in range(nbins): 
        if (Xij < bins[b,0]) or (Xij > bins[b,1]): 
         continue 
        Z[b] += Xij*Yij 

    return np.asarray(Z) 

模拟数据和箱

X = np.random.rand(10000) 
bin_edges = np.linspace(0.,1,11) 
bins = np.array([bin_edges[:-1],bin_edges[1:]]).T 
bins = bins.copy(order='C') 

时序经由

%timeit my_parallel_statistic(X,bins,1) 
%timeit my_parallel_statistic(X,bins,4) 

产量

1 loop, best of 3: 728 ms per loop 
1 loop, best of 3: 330 ms per loop 

这是不是一个完美的比例,但是这不是问题的重点。 (但不要让我知道,如果你有超越添加常用的装饰或微调PRANGE参数的建议。)

然而,这种计算显然不是线程安全的:

Z1 = my_parallel_statistic(X,bins,1) 
Z4 = my_parallel_statistic(X,bins,4) 
np.allclose(Z1,Z4) 

透着显著差异在这两个结果之间(在这个例子中高达20%)。

我强烈怀疑,问题是,多个线程可以在同一时间做

Z[b] += Xij*Yij 

。但是我不知道如何在不牺牲加速的情况下解决这个问题。

在我的实际使用情况中,Xij和Yij的计算更加昂贵,因此我希望每对执行一次。此外,预先计算和存储所有对的Xij和Yij,然后简单地循环通过bin不是一个好选择,因为N可以变得非常大,并且我不能在内存中存储100,000 x 100,000个numpy数组(这实际上是在Cython中重写它的主要动机!)。

系统信息(中添加注释如下建议):

CPU(s): 8 
Model name: Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz 
OS: Red Hat Linux v6.8 
Memory: 16 GB 
+0

每个线程中的动作是否真的独立于任何其他动作?首先运行哪一个是否重要?如果存在任何类型的依赖性,这不适合并行操作。 – hpaulj

+0

只要每个线程创建自己的Xij和Yij,他们应该是独立的(但也许这是问题?)就数学而言,Xij和Yij独立计算每对(i,j) ,因此也是对统计量Z的贡献。 – user4319496

+1

谢谢你在你的问题中包含如此出色的[mcve]!这样一个经过深入研究和制定的问题在SO上太稀缺了。你可能包含的唯一东西是你的CPU模型和内存来评论性能,但这不是问题的主要观点。 – Zulan

回答

4

是的,Z[b] += Xij*Yij确实是一种竞赛条件。

有几个选项可以使atomiccritical。抛开Cython的实施问题,由于共享的Z矢量上的虚假共享,您在任何情况下都会导致性能不佳。

所以更好的选择是为每个线程保留一个专用数组。还有一些(非)选项。可以使用私人的malloc'd指针,但我想坚持np。内存片不能被分配为私有变量。二维(num_threads, nbins)数组可行,但由于某种原因会生成非常复杂的低效数组索引代码。这有效,但速度较慢,不能缩放。

具有手动“2D”索引的平面numpy阵列效果很好。通过避免将数组的私有部分填充为64字节(这是典型的高速缓存行大小),可以获得一些额外的性能。这可以避免核心之间的错误共享。私密部分简单地在平行区域外串联。

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a 
from cython cimport boundscheck 
import numpy as np 
from cython.parallel cimport prange, parallel 
cimport openmp 

@boundscheck(False) 
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads): 

    cdef: 
     int N = X.shape[0] 
     int nbins = bins.shape[0] 
     double Xij,Yij 
     # pad local data to 64 byte avoid false sharing of cache-lines 
     int nbins_padded = (((nbins - 1) // 8) + 1) * 8 
     double[:] Z_local = np.zeros(nbins_padded * num_threads,dtype=np.float64) 
     double[:] Z = np.zeros(nbins) 
     int i,j,b, bb, tid 

    with nogil, parallel(num_threads=num_threads): 
     tid = openmp.omp_get_thread_num() 
     for i in prange(N,schedule='static',chunksize=1): 
      for j in range(i): 
       #some pairwise quantities 
       Xij = X[i]-X[j] 
       Yij = 0.5*(X[i]+X[j]) 
       #check if in bin 
       for b in range(nbins): 
        if (Xij < bins[b,0]) or (Xij > bins[b,1]): 
         continue 
        Z_local[tid * nbins_padded + b] += Xij*Yij 
    for tid in range(num_threads): 
     for bb in range(nbins): 
      Z[bb] += Z_local[tid * nbins_padded + bb] 


    return np.asarray(Z) 

这表现相当好我的4核机器上,用720 ms/191 ms,3.6的加速。剩余的差距可能是由于涡轮模式。我现在无法使用适当的机器进行测试。

+0

感谢您的伟大答案,给出了一个固定的版本和背景信息来理解它! PS:我在代码中修复了一个小错误:在最后的串行循环中,索引应该是bb而不是b(修复等待审查/批准)。 – user4319496

1

你是正确的,将获得Z是一个竞争条件下。

num_threads的副本Z定义为cdef double[:] Z = np.zeros((num_threads, nbins), dtype=np.float64),并在prange循环后执行沿轴0的和可能会更好。

return np.sum(Z, axis=0) 

用Cython代码可以在一个并行区域with gil声明,但它只是记录错误处理。您可以查看一般的C代码,看看是否会触发原子OpenMP操作,但我怀疑它。