4

我是新来的线程编程,我有一个概念问题。我正在做矩阵乘法作为我班的项目。然而,我没有使用线程,然后使用线程为回答矩阵的每个单元计算标量乘积,然后再次将第一个矩阵分成比例,以便每个线程都有相等的部分进行计算。我的问题是标量产品实现非常快速完成,这是我所期望的,但第三个实现不会比非线程实现计算出的答案快得多。例如,如果它使用2个线程,它会在大约一半时间内复制它,因为它可以同时在矩阵的两个半部分上工作,但事实并非如此。我觉得在第三个实现中有一个问题,我不认为它并行运行,代码如下。任何人都可以直接在我这里?并非所有的代码都与问题相关,但我将它包括在内以防问题不在本地。 谢谢,关于线程的问题

主要课程:

#include <iostream> 
#include <cstdio> 
#include <cstdlib> 
#include <cmath> 
#include<fstream> 
#include<string> 
#include<sstream> 

#include <matrix.h> 
#include <timer.h> 
#include <random_generator2.h> 

const float averager=2.0; //used to find the average of the time taken to multiply the matrices. 

//Precondition: The matrix has been manipulated in some way and is ready to output the statistics 
//Outputs the size of the matrix along with the user elapsed time. 
//Postconidition: The stats are outputted to the file that is specified with the number of threads used 
//file name example: "Nonparrallel2.dat" 
void output(string file, int numThreads , long double time, int n); 

//argv[1] = the size of the matrix 
//argv[2] = the number of threads to be used. 
//argv[3] = 
int main(int argc, char* argv[]) 
{ 
    random_generator rg; 
    timer t, nonparallel, scalar, variant; 
    int n, total = 0, numThreads = 0; 
    long double totalNonP = 0, totalScalar = 0, totalVar = 0; 

    n = 100; 

/* 
* check arguments 
*/ 
     n = atoi(argv[1]); 
     n = (n < 1) ? 1 : n; 
     numThreads = atoi(argv[2]); 
/* 
* allocated and generate random strings 
*/ 
    int** C; 
    int** A; 
    int** B; 

    cout << "**NOW STARTING ANALYSIS FOR " << n << " X " << n << " MATRICES WITH " << numThreads << "!**"<< endl; 

    for (int timesThrough = 0; timesThrough < averager; timesThrough++) 
    { 

     cout << "Creating the matrices." << endl; 
     t.start(); 
     C = create_matrix(n); 
     A = create_random_matrix(n, rg); 
     B = create_random_matrix(n, rg); 
     t.stop(); 

     cout << "Timer (generate): " << t << endl; 

     //---------------------------------------------------------Ends non parallel----------------------------- 
     /* 
     * run algorithms 
     */ 
      cout << "Running non-parallel matrix multiplication: " << endl; 
      nonparallel.start(); 
      multiply(C, A, B, n); 
      nonparallel.stop(); 
     //-----------------------------------------Ends non parallel---------------------------------------------- 


     //cout << "The correct matrix" <<endl; 
     //output_matrix(C, n); 

      cout << "Timer (multiplication): " << nonparallel << endl; 
      totalNonP += nonparallel.user(); 


      //D is the transpose of B so that the p_scalarproduct function does not have to be rewritten 
      int** D = create_matrix(n); 
      for (int i = 0; i < n; i++) 
      for(int j = 0; j < n; j++) 
       D[i][j] = B[j][i]; 
     //---------------------------------------------------Start Threaded Scalar Poduct-------------------------- 
      cout << "Running scalar product in parallel" << endl; 
      scalar.start(); 
      //Does the scalar product in parallel to multiply the two matrices. 
      for (int i = 0; i < n; i++) 
      for (int j = 0; j < n; j++){ 
      C[i][j] = 0; 
      C[i][j] = p_scalarproduct(A[i],D[j],n,numThreads); 
      }//ends the for loop with j 
      scalar.stop(); 

      cout << "Timer (scalar product in parallel): " << scalar << endl; 
      totalScalar += scalar.user(); 
     //---------------------------------------------------Ends Threaded Scalar Poduct------------------------ 


     //---------------------------------------------------Starts Threaded Variant For Loop--------------- 
      cout << "Running the variation on the for loop." << endl; 
      boost :: thread** thrds; 


      //create threads and bind to p_variantforloop_t 
      thrds = new boost::thread*[numThreads]; 

      variant.start(); 
      for (int i = 1; i <= numThreads; i++) 
       thrds[i-1] = new boost::thread(boost::bind(&p_variantforloop_t, 
         C, A, B, ((i)*n - n)/numThreads ,(i * n)/numThreads, numThreads, n)); 
cout << "before join" <<endl; 
      // join threads 
       for (int i = 0; i < numThreads; i++) 
      thrds[i]->join(); 
      variant.stop(); 

      // cleanup 
       for (int i = 0; i < numThreads; i++) 
      delete thrds[i]; 
       delete[] thrds; 

     cout << "Timer (variation of for loop): " << variant <<endl; 
     totalVar += variant.user(); 
     //---------------------------------------------------Ends Threaded Variant For Loop------------------------ 

     // output_matrix(A, n); 
     // output_matrix(B, n); 
     // output_matrix(E,n); 

     /* 
     * free allocated storage 
     */ 

     cout << "Deleting Storage" <<endl; 

      delete_matrix(A, n); 
      delete_matrix(B, n); 
      delete_matrix(C, n); 
      delete_matrix(D, n); 

     //avoids dangling pointers 
      A = NULL; 
      B = NULL; 
      C = NULL; 
      D = NULL; 
    }//ends the timesThrough for loop 

    //output the results to .dat files 
    output("Nonparallel", numThreads, (totalNonP/averager) , n); 
    output("Scalar", numThreads, (totalScalar/averager), n); 
    output("Variant", numThreads, (totalVar/averager), n); 

    cout << "Nonparallel = " << (totalNonP/averager) << endl; 
    cout << "Scalar = " << (totalScalar/averager) << endl; 
    cout << "Variant = " << (totalVar/averager) << endl; 

    return 0; 
} 

void output(string file, int numThreads , long double time, int n) 
{ 
    ofstream dataFile; 
    stringstream ss; 

    ss << numThreads; 
    file += ss.str(); 
    file += ".dat"; 

    dataFile.open(file.c_str(), ios::app); 
    if(dataFile.fail()) 
    { 
     cout << "The output file didn't open." << endl; 
     exit(1); 
    }//ends the if statement. 
    dataFile << n << "  " << time << endl; 
    dataFile.close(); 
}//ends optimalOutput function 

矩阵文件:

#include <matrix.h> 
#include <stdlib.h> 

using namespace std; 

int** create_matrix(int n) 
{ 
    int** matrix; 

    if (n < 1) 
    return 0; 

    matrix = new int*[n]; 
    for (int i = 0; i < n; i++) 
    matrix[i] = new int[n]; 

    return matrix; 
} 

int** create_random_matrix(int n, random_generator& rg) 
{ 
    int** matrix; 

    if (n < 1) 
    return 0; 

    matrix = new int*[n]; 
    for (int i = 0; i < n; i++) 
    { 
     matrix[i] = new int[n]; 
     for (int j = 0; j < n; j++) 
    //rg >> matrix[i][j]; 
    matrix[i][j] = rand() % 100; 
    } 

    return matrix; 
} 

void delete_matrix(int** matrix, int n) 
{ 
    for (int i = 0; i < n; i++) 
     delete[] matrix[i]; 

    delete[] matrix; 

    //avoids dangling pointers. 
    matrix = NULL; 
} 

/* 
* non-parallel matrix multiplication 
*/ 
void multiply(int** C, int** A, int** B, int n) 
{ 
    if ((C == A) || (C == B)) 
    { 
     cout << "ERROR: C equals A or B!" << endl; 
     return; 
    } 

    for (int i = 0; i < n; i++) 
    for (int j = 0; j < n; j++) 
     { 
    C[i][j] = 0; 
    for (int k = 0; k < n; k++) 
     C[i][j] += A[i][k] * B[k][j]; 
    } 
} 

void p_scalarproduct_t(int* c, int* a, int* b, 
        int s, int e, boost::mutex* lock) 
{ 
    int tmp; 

    tmp = 0; 
    for (int k = s; k < e; k++){ 
    tmp += a[k] * b[k]; 
//cout << "a[k]= "<<a[k]<<"b[k]= "<< b[k] <<" "<<k<<endl; 
} 
    lock->lock(); 
    *c = *c + tmp; 
    lock->unlock(); 
} 

int p_scalarproduct(int* a, int* b, int n, int m) 
{ 
    int c; 
    boost::mutex lock; 
    boost::thread** thrds; 

    c = 0; 

/* create threads and bind to p_merge_sort_t */ 
    thrds = new boost::thread*[m]; 
    for (int i = 0; i < m; i++) 
    thrds[i] = new boost::thread(boost::bind(&p_scalarproduct_t, 
          &c, a, b, i*n/m, (i+1)*n/m, &lock)); 
/* join threads */ 
    for (int i = 0; i < m; i++) 
    thrds[i]->join(); 

/* cleanup */ 
    for (int i = 0; i < m; i++) 
    delete thrds[i]; 
    delete[] thrds; 

    return c; 
} 

void output_matrix(int** matrix, int n) 
{ 
    cout << "["; 
    for (int i = 0; i < n; i++) 
    { 
     cout << "[ "; 
     for (int j = 0; j < n; j++) 
    cout << matrix[i][j] << " "; 
     cout << "]" << endl; 
    } 
    cout << "]" << endl; 
} 





void p_variantforloop_t(int** C, int** A, int** B, int s, int e, int numThreads, int n) 
{ 
//cout << "s= " <<s<<endl<< "e= " << e << endl; 
    for(int i = s; i < e; i++) 
     for(int j = 0; j < n; j++){ 
      C[i][j] = 0; 
//cout << "i " << i << " j " << j << endl; 
      for (int k = 0; k < n; k++){ 
      C[i][j] += A[i][k] * B[k][j];} 
     } 
}//ends the function 
+0

如果您隔离并运行第三种变体,您是否看到所有CPU内核同时处于活动状态(并希望与之挂钩)? ...从代码看来它应该并行工作,但如果出现问题,至少可以快速进行完整性检查。 – Jason 2011-05-01 20:38:10

+0

@jason我不确定如何检查这与Linux,你有什么建议如何做到这一点?我在学校使用电脑,所以我没有root权限。 – tpar44 2011-05-01 21:12:23

+0

什么发行版?在Ubuntu上,我只需进入系统 - >管理 - >系统监视器查看CPU负载... Fedora/RH/SuSE/etc。尽管菜单可能位于不同的地方,但它们具有相同的功能。另外,如果你只有一个命令行,你可以输入'top'来获得一个基于进程的CPU监视器(也提供了许多其他统计信息)。 – Jason 2011-05-01 21:18:46

回答

3

我的猜测是,你正在运行到False Sharing。尝试在p_variantforloop_t使用本地变量:

void p_variantforloop_t(int** C, int** A, int** B, int s, int e, int numThreads, int n) 
{ 
    for(int i = s; i < e; i++) 
     for(int j = 0; j < n; j++){ 
      int accu = 0; 
      for (int k = 0; k < n; k++) 
      accu += A[i][k] * B[k][j]; 
      C[i][j] = accu; 
     } 
} 
+0

好吧,这对我来说很合理,但是当我将它添加到我的程序中时,即使我完全添加了该文章中的内容,它也不会反映在我的输出中 – tpar44 2011-05-01 20:22:32

+0

如果您将整个临时矩阵用作累加器在你的函数中,只是将结果从临时矩阵复制到'p_variantforloop_t'函数末尾的C处,而不是在for循环中间逐个单元地复制? – Jason 2011-05-01 20:43:47

+0

@Jason:我在想同样的事情,但是OP沿着最大步长维度(第一个索引,正确?)划分工作,写入沿着最小步长维度,所以我不太确定它实际上是False Sharing ,除非范围很小。 – 2011-05-01 20:52:23

0

基于在评论你的回应,从理论上讲,因为你只能有一个线程(即CPU),所有的线程版本应该是相同的时间作为单线程版本或更长的时间,因为线程管理开销。你不应该看到任何加速,因为解决矩阵的一部分的时间片是从另一个并行任务中窃取的时间片。只有一个CPU,你只能分时分享CPU的资源 - 在给定的一段时间内没有真正的并行工作。那么我会怀疑你的第二个实现运行得更快的原因是因为你在内部循环中做了更少的指针解引用和内存访问。例如,在multiplyp_variantforloop_t的主操作C[i][j] += A[i][k] * B[k][j];中,您正在查看汇编级别的许多操作,其中许多与内存相关。它看起来像在“组装伪码”下列:

1)从由A栈上所引用的地址移动指针值到寄存器R1
2)由关的值增量寄存器R1地址栈由可变ij,或k
3)将指针地址值的地址所指向的R1R1
4)由值增量R1地址关闭由可变i引用的堆栈引用j,或k
5)从所述地址移动值指向R1R1(所以R1现在持有的A[i][k]的值)
6)执行用于通过B堆栈到寄存器上引用的地址的步骤1-5 R2(所以R2现在持有的B[k][j]的值)
7)执行用于通过C在栈上所引用到寄存器R3
8的地址步骤1-4)将值从地址指向R3R4(即,R4保持在C[i][j]实际值)
9)乘在寄存器寄存器R1R2和存储R5
10)添加寄存器R4R5和存储在R4
11)从R4移动终值回到存储器地址R3(现在的C[i][j]有最终结果)

这就是假设我们有5个通用寄存器可供使用,并且编译器正确地优化了你的C代码以利用它们。我将循环索引变量i,jk留在堆栈上,因此访问它们需要的时间比它们在寄存器中的时间要多得多......这取决于您的编译器在您的平台上需要使用多少个寄存器。另外,如果你没有进行任何优化编译,你可能会从堆栈中进行更多的内存访问,其中一些临时值存储在堆栈而不是寄存器中,然后从堆栈中重新获取,这需要花费更长的时间比在寄存器之间移动值。无论哪种方式,上面的代码都很难优化。它可以工作,但是如果你使用的是32位x86平台,那么你不会有那么多的通用寄存器来玩(尽管你至少应该有6个)。 x86_64有更多的寄存器可供使用,但仍然有所有的内存访问需要处理。

另一方面,在一个紧密的内部循环中,类似于tmp += a[k] * b[k]p_scalarproduct_t的操作将会移动得更快......这里是在装配的伪代码的上述操作:

会有为循环

1)请tmp寄存器R1小初始化步骤,而不是堆栈变量,和初始化它的值设置为0
2 )移动通过a栈上所引用的地址值到R2
3)R2
4添加的s值从堆栈到R2和保存得到的地址)移动通过引用的地址值0堆叠成R3
5)添加的s值从堆栈到R3R3
6)设置保存所得地址初始化为e - s

我们将一次性初始化之后在R6的计数器上开始实际的内环

7)移动的值的地址所指向的R2R4
8)将值从地址指向R3R5
9)乘法R4R5并存储在R5
10)的结果添加R5R1并存储在R1
11的结果)增量在R6R2R3
12)递减计数器直到达到零,我们终止循环

我不能保证这正是你的编译器设置这个循环的方式,但是你可以通过你的标量例子看到在内部循环中需要的步骤少,更重要的是l ess内存访问。因此,更多的操作可以通过仅使用寄存器的操作完成,而不是包含存储器位置的操作,并且需要进行存储器提取,这比仅寄存器操作要慢得多。所以总的来说,它会更快地移动,而这与线程无关。

最后,我注意到你只有标产品两个嵌套的循环,所以它的复杂度为O(N^2),其中,为您的另外两种方法你有Ø三个嵌套循环(N^3)复杂。这也会有所作为。