2016-03-01 88 views
3

目前,我编程的k-均值++使用OpenMP和C.并行版本直到现在,我实施质心的初始化。如果您对此过程不熟悉,则其工作原理与follows大致相同。给定一个dataset(基体)与n点,k质心是使用“概率函数”,也被称为轮盘赌选择initilized。OpenMP程序(k均值++)不结垢

假设你有n=4点,距离下面的数组一些重心:

distances = [2, 4, 6, 8] 
dist_sum = 20 

从这些,除以distances每个条目由dist_sum,并添加以前的结果,就像定义一个累积概率数组这:

probs  = [0.1, 0.2, 0.3, 0.4] = [2/20, 4/20, 6/20, 8/20] 
acc_probs = [0.1, 0.3, 0.6, 1.0] 

然后,执行轮盘赌选择。给定一个随机数,说r=0.5,选择使用racc_probs,下一个点迭代acc_probs直到r < acc_probs[i]。在这个例子中,所选择的点是因为i=2r < acc_probs[2]

问题 在这种情况下,我非常大的矩阵(约点n=16 000 000)工作。尽管该程序给出了正确的答案(即重心的良好初始化),但它并没有如预期的那样成比例。该函数按照此算法计算初始质心。

double **parallel_init_centroids (double **dataset, int n, int d, int k, RngStream randomizer, long int *total_ops) { 

    double dist=0, error=0, dist_sum=0, r=0, partial_sum=0, mindist=0; 
    int cn=0, cd=0, ck = 0, cck = 0, idx = 0; 
    ck = 0; 

    double probs_sum = 0; // debug 

    int mink=0, id=0, cp=0; 

    for (ck = 0; ck < k; ck++) { 

     if (ck == 0) { 

      // 1. choose an initial centroid c_0 from dataset randomly 
      idx = RngStream_RandInt (randomizer, 0, n-1); 

     } 
     else { 

      // 2. choose a successive centroid c_{ck} using roulette selection 
      r = RngStream_RandU01 (randomizer); 
      idx = 0; 
      partial_sum = 0; 
      for (cn=0; cn<n; cn++) { 

       partial_sum = partial_sum + distances[cn]/dist_sum; 

       if (r < partial_sum) { 
        idx = cn; 
        break; 
       } 
      } 
     } 

     // 3. copy centroid from dataset 
     for (cd=0; cd<d; cd++) 
      centroids[ck][cd] = dataset[idx][cd]; 

     // reset before parallel region 
     dist_sum = 0; 

     // -- parallel region -- 
     # pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum_threads, total_ops_threads) private(id, cn, cck, cd, cp, error, dist, mindist, mink) 
     { 
      id = omp_get_thread_num(); 
      dist_sum_threads[id] = 0;    // each thread reset its entry 

      // parallel loop 
      // 4. recompute distances against centroids 
      # pragma omp for schedule(static,chunk) 
      for (cn=0; cn<n; cn++) { 

       mindist = DMAX; 
       mink = 0; 

       for (cck=0; cck<=ck; cck++) { 

        dist = 0; 

        for (cd=0; cd<d; cd++) { 

         error = dataset[cn][cd] - centroids[ck][cd]; 
         dist = dist + (error * error);     total_ops_threads[id]++; 
        } 

        if (dist < mindist) { 
         mindist = dist; 
         mink = ck; 
        } 
       } 

       distances[cn]   = mindist; 
       clusters[cn]   = mink; 
       dist_sum_threads[id] += mindist;  // each thread contributes before reduction 
      } 
     } 
     // -- parallel region -- 

     // 5. sequential reduction 
     dist_sum = 0; 
     for (cp=0; cp<p; cp++) 
      dist_sum += dist_sum_threads[cp]; 
    } 


    // stats 
    *(total_ops) = 0; 
    for (cp=0; cp<p; cp++) 
     *(total_ops) += total_ops_threads[cp]; 

    // free it later 
    return centroids; 
} 

正如你所看到的,并行区域计算中nd维点对kd维质心的距离。这项工作在p线程(最多32个)之间共享。并行区域结束后,两个阵列被填充:distancesdist_sum_threads。第一个数组与前一个示例相同,而第二个数组包含每个线程收集的累积距离。考虑前面的例子,如果p=2线程是可用的,那么该阵列被定义如下:

dist_sum_threads[0] = 6 ([2, 4]) # filled by thread 0 
dist_sum_threads[1] = 14 ([6, 8]) # filled by thread 1 

dist_sum通过加入的dist_sum_threads每个条目定义。该函数按预期工作,但是当线程数量增加时,执行时间会增加。这figure显示了一些性能指标。

出了什么问题我的执行,特别是使用OpenMP?综上所述,采用了只有两个杂注:

# pragma omp parallel ... 
{ 
    get thread id 

    # pragma omp for schedule(static,chunk) 
    { 
     compute distances ... 
    } 

    fill distances and dist_sum_threads[id] 
} 

换句话说,我消除了障碍,互斥访问,以及其他的东西,可能会导致额外的开销。但是,执行时间随着线程数量的增加而变得最差。

更新

  • 上面的代码已经被更改为mcveThis snippet与我以前的代码类似。在这种情况下,计算n=100000点和k=16质心之间的距离。
  • 执行时间在并行区域之前和之后使用omp_get_wtime进行测量。总时间存储在wtime_spent中。
  • 我包含一个减少计算dist_sum。然而,它并不像预期的那样工作(它在下面被评论为不好的并行减少)。 dist_sum的正确值是999857108020.0,但是当使用p线程来计算它时,结果为999857108020.0 * p,这是错误的。
  • 性能情节updated
  • 这是主要的并行功能,完整的代码位于here

    double **parallel_compute_distances (double **dataset, int n, int d, int k, long int *total_ops) { 
    
        double dist=0, error=0, mindist=0; 
    
        int cn, cd, ck, mink, id, cp; 
    
        // reset before parallel region 
        dist_sum = 0;    
    
        // -- start time -- 
        wtime_start = omp_get_wtime(); 
    
        // parallel loop 
        # pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum, dist_sum_threads) private(id, cn, ck, cd, cp, error, dist, mindist, mink) 
        { 
         id = omp_get_thread_num(); 
         dist_sum_threads[id] = 0;    // reset 
    
         // 2. recompute distances against centroids 
         # pragma omp for schedule(static,chunk) 
         for (cn=0; cn<n; cn++) { 
    
          mindist = DMAX; 
          mink = 0; 
    
          for (ck=0; ck<k; ck++) { 
    
           dist = 0; 
    
           for (cd=0; cd<d; cd++) { 
    
            error = dataset[cn][cd] - centroids[ck][cd]; 
            dist = dist + (error * error);        total_ops_threads[id]++; 
           } 
    
           if (dist < mindist) { 
            mindist = dist; 
            mink = ck; 
           } 
          } 
    
          distances[cn]   = mindist; 
          clusters[cn]   = mink; 
          dist_sum_threads[id] += mindist; 
         } 
    
    
         // bad parallel reduction 
         //#pragma omp parallel for reduction(+:dist_sum) 
         //for (cp=0; cp<p; cp++){    
         // dist_sum += dist_sum_threads[cp]; 
         //} 
    
        } 
    
    
        // -- end time -- 
        wtime_end = omp_get_wtime(); 
    
        // -- total wall time -- 
        wtime_spent = wtime_end - wtime_start; 
    
        // sequential reduction 
        for (cp=0; cp<p; cp++)  
         dist_sum += dist_sum_threads[cp]; 
    
    
        // stats 
        *(total_ops) = 0; 
        for (cp=0; cp<p; cp++) 
         *(total_ops) += total_ops_threads[cp]; 
    
        return centroids; 
    } 
    

回答

2

您的代码不是一个mcve我只能在这里发出的假说。然而,这里就是我的想法(可能)发生(按重要性没有特定的顺序):

  • 你从错误共享苦,当你更新dist_sum_threadstotal_ops_threads。只需在parallel区域内声明reduction(+: dist_sum)并直接使用dist_sum即可完全避免前者。您也可以使用同样使用本地total_ops宣布为reduction(+),并且您最后将其累积到*total_ops中。 (顺便说一句,dist_sum是计算,但从来没有用过......)

  • 该代码看起来无论如何都是内存绑定,因为你有大量的内存访问几乎没有计算。因此,预期的加速将主要取决于您的内存带宽和可以在并行代码时访问的内存控制器的数量。有关更多详细信息,请参阅this epic answer

  • 鉴于上述可能遇到的问题的内存绑定特性,请尝试使用内存放置(numactl可能和/或与proc_bind的线程关联)。您也可以尝试使用线程调度策略,或尝试查看是否有一些循环平铺无法应用于将数据阻塞到缓存中的问题。

  • 您没有详细说明测量时间的方式,但请注意,加速时间仅适用于挂钟时间而非CPU时间。请使用omp_get_wtime()进行任何此类测量。

试着解决这些问题,并根据您的内存架构评估您的实际潜在加速。如果你仍然觉得自己没有达到你应该做的,那么就更新你的问题。


编辑

既然你提供了一个完整的例子,我成功地试验了一下你的代码,并实施修改我脑子里想的(减少伪共享居多)。

这里是什么功能没有的样子:

double **parallel_compute_distances(double **dataset, int n, int d, 
            int k, long int *total_ops) { 
    // reset before parallel region 
    dist_sum = 0;    

    // -- start time -- 
    wtime_start = omp_get_wtime(); 

    long int tot_ops = 0; 

    // parallel loop 
    # pragma omp parallel for reduction(+: dist_sum, tot_ops) 
    for (int cn = 0; cn < n; cn++) { 
     double mindist = DMAX; 
     int mink = 0; 
     for (int ck = 0; ck < k; ck++) { 
      double dist = 0; 
      for (int cd = 0; cd < d; cd++) { 
       double error = dataset[cn][cd] - centroids[ck][cd]; 
       dist += error * error; 
       tot_ops++; 
      } 
      if (dist < mindist) { 
       mindist = dist; 
       mink = ck; 
      } 
     } 
     distances[cn] = mindist; 
     clusters[cn] = mink; 
     dist_sum += mindist; 
    } 

    // -- end time -- 
    wtime_end = omp_get_wtime(); 

    // -- total wall time -- 
    wtime_spent = wtime_end - wtime_start; 

    // stats 
    *(total_ops) = tot_ops; 

    return centroids; 
} 

所以,谈几点看法:

  • 如前所述,dist_sum和操作总数的局部变量(tot_ops )现在宣布为reduction(+:)。这避免了每个索引使用一个线程访问相同的数组,这触发了false sharing(这几乎是触发它的最佳情况)。我使用了局部变量而不是total_ops作为指针,它不能直接在reduction子句中使用。但是,最后用tot_ops更新它可以完成这项工作。

  • 我尽可能延迟了所有的变量声明。这是一个很好的做法,因为它可以避免大部分private声明,这些声明通常是OpenMP程序员的主要缺陷。现在你只需要考虑两个reduction变量和两个数组,它们显然是shared,因此不需要任何额外的声明。这简化了很多parallel指令,并有助于集中于哪些事项

  • 现在不需要线程ID的越多,parallelfor指令可以合并为更好的可读性(以及可能的表现太)。

  • 我删除了schedule子句,让编译器和/或运行时库使用它们的默认值。如果我有充分的理由,我只会选择不同的调度策略。

就这样,在我的双核笔记本电脑唱GCC 5.3.0,并与-std=c99 -O3 -fopenmp -mtune=native -march=native编译,我得到不同的线程数和2倍的2个线程加速一致的结果。

在一个10核的机器,使用英特尔编译器和-std=c99 -O3 -xhost -qopenmp,我从1到10个线程的线性加速...

即使是在至强融核KNC我能得到接近线性的速度 - 从1到60个线程(然后使用更多的硬件线程仍然提供一些加速,但不是相同的扩展)。

观察到的加速度使我意识到,与我所设想的不同,代码不受内存限制,因为您访问的数组实际上被很好地缓存。原因在于您只能访问dataset[cn][cd]centroids[ck][cd],其中第二个维度非常小(40和16),因此很适合缓存,而要加载下一个cn索引的大块dataset可以有效地预取。

您仍然遇到此版本的代码的可扩展性问题?

+0

“即使在Xeon Phi KNC上,我也可以从1到60个线程获得接近线性的速度(然后使用更多的硬件线程仍然可以提高速度,但不会达到相同的程度)。” KNC有60个核心,但可以运行4个线程/核心。因此(假设你正在使用分散关系),前60个线程每个都有一个完整的内核,但是60个以上的线程会在同一个内核上有多个HW线程。因此,您不应该期望获得相同的性能。最好在x和单独的线上用1T/C,2T/C,4T/C的内核绘制缩放比例。 (KMP_PLACE_THREADS环境可以提供帮助) –