2017-08-25 71 views
1

将此嵌套循环与携带依赖并行化的最佳方法是什么? Konwing它在一个函数中,我从main调用n次。OpenMP - 如何将循环与携带依赖并行化

[编辑]

funct(unsigned char*** grid, int n) { 

# pragma omp parallel for num_threads(thread_count) default(none) \ 
    shared(grid, n, cur) private(i, j) 

     for(i = 1; i <= n+1; i++) { 
      for(j = 1; j <= n; j++) { 
       if(grid[cur][i-1][j] == 2 && grid[cur][i][j] == 0) { 
        grid[1-cur][i-1][j] = 0; 
        grid[1-cur][i][j] = 2; 
       } 
       else { 
        grid[1-cur][i][j] = grid[cur][i][j]; 
       } 
      } 
     } 
    } 

主营:

cur = 0; 

for(s = 0; s < steps; s++) { 
    funct(grid, N); 
    cur = 1-cur; 
    funct_2(grid, N) 
    cur = 1-cur 
} 

此代码运行没有错误,但返回不正确的结果(.ppm格式文件)。

+0

@HighPerformanceMark如果我明白这是我已经在'cur'和'1-cur'之间切换。但是如果我在外循环之前使用'pragma omp parallel for',那么它不起作用。那么让我们假设它不会产生与串行代码相同的结果(ppm文件)。 – Caramelleamare

+1

您最好展示导致问题的真实代码,您发布的内容中没有任何OpenMP杂注,因此很难做出更多有关错误的猜测。或者,如果不是真正的代码a [mcve],它恰当地说明问题。 –

+0

@HighPerformanceMark实际代码太长且详细。不过,我已经编辑过问题,现在我希望它更好。 – Caramelleamare

回答

1

循环嵌套优化/并行化是我博士研究的主题。下面的代码是使用我自己的优化编译器自动生成的。它涉及平铺转换,并行执行提取的(平铺式)无同步切片。

#define min(x,y) ((x) < (y) ? (x) : (y)) 
#define floord(n,d) (((n)<0) ? -((-(n)+(d)-1)/(d)) : (n)/(d)) 
#pragma scop 
#pragma omp parallel for 
for (register int ir0 = 0; ir0 <= floord(n, 32); ir0 += 1) { 
    for (register int ir1 = 0; ir1 <= floord(n - 1, 32); ir1 += 1) { 
    if (ir0 == 0) { 
     for (register int i1 = 32 * ir1 + 1; i1 <= min(n, 32 * ir1 + 32); i1 += 1) { 
     grid[-cur + 1][0][i1] = (((grid[cur][0][i1] == 2) && (grid[cur][1][i1] == 0)) ? 0 : grid[-cur + 1][0][i1]); 
     } 
    } 
    if (n >= 32 * ir0 + 1) { 
     for (register int ii0 = ir0; ii0 <= min(ir0 + 1, n/32); ii0 += 1) { 
     for (register int i0 = 32 * ii0 + 1; i0 <= min(n + 1, 31 * ir0 + ii0 + 32); i0 += 1) { 
      for (register int i1 = 32 * ir1 + 1; i1 <= min(n, 32 * ir1 + 32); i1 += 1) { 
      if (i0 >= 32 * ir0 + 2) { 
       grid[-cur + 1][i0 - 1][i1] = (((grid[cur][i0 - 1][i1] == 2) && (grid[cur][i0][i1] == 0)) ? 0 : grid[-cur + 1][i0 - 1][i1]); 
      } 
      if (ii0 == ir0) { 
       grid[-cur + 1][i0][i1] = (((grid[cur][i0 - 1][i1] == 2) && (grid[cur][i0][i1] == 0)) ? 2 : grid[cur][i0][i1]); 
      } 
      } 
     } 
     } 
    } else { 
     for (register int i1 = 32 * ir1 + 1; i1 <= 32 * ir1 + 32; i1 += 1) { 
     grid[-cur + 1][n + 1][i1] = (((grid[cur][n][i1] == 2) && (grid[cur][n + 1][i1] == 0)) ? 2 : grid[cur][n + 1][i1]); 
     } 
    } 
    } 
} 
#pragma endscop 

我测试所述代码与8个线程上grid[2][N][N],操作,其中N = {2500,5500}。与原始代码的串行执行相比,我获得了3倍的加速比。

+0

让我知道它是否有效。结果应该是正确的。 –

+0

你说得对,它的功能非常好,但是正如我所说的那样,对我的产品来说太高级了,远远超过了我的其他基本代码。无论如何感谢支持。 – Caramelleamare