2017-09-23 233 views
2

我正在优化一个函数,我想摆脱慢循环。我正在寻找一种更快的方法来将矩阵的每一行乘以一个向量。将矩阵的行乘以一个向量(低级优化)?

我不是在寻找一种'古典'的乘法。

例如,我有一个矩阵,有1024列,20行,矢量长度为1024.结果,我想有矩阵1024 x 20,每行乘以矢量。

我现在在做什么我在for循环遍历矩阵行并使用mkl v?Mul执行当前矩阵行和向量的元素乘法。任何想法如何改善这一点?

的问题是,复制Multiply rows of matrix by vector? 但对于C++可能出现的低级别的优化和MKL,而不是针对R

+1

我假设你的意思是1024行和20列?是固定的20(或在编译时已知并保证是4的倍数)?你的矩阵存储rowmajor或columnmajor? – chtz

+0

@chtz 1024列 - 即功能。在另一种情况下是52列。两者都是固定的,是4的倍数。而20是批量大小。我选择了它,但它不可能很大。并且有许多这样的乘法迭代。 –

+0

我可能误解了你想要做的事情。如果你想为每一行逐个元素地进行乘法运算,那么你的矩阵应该有与矢量元素一样多的列,不是吗? (如果这不是你想要的,请写一些伪代码,或者你正在使用的代码) – chtz

回答

2

使用Eigen matrix library,你在做什么本质上是由对角矩阵相乘。如果你有任意多行和20列的矩阵,你可以写以下(不值得做一个函数,该函数):

void multRows(Eigen::Matrix<double, Eigen::Dynamic, 20>& mat, 
       const Eigen::Matrix<double,20,1>& vect) 
{ 
    mat = mat * vect.asDiagonal(); 
} 

征做,如果它是由编译器生成激活代码AVX2。 您可能想要试验在您的用例中存储mat行主要或列主要是否更有效。

附录(由于编辑的问题):如果你有(多)超过20列,你应该只使用动态大小的矩阵全在一起:

void multRows(Eigen::MatrixXd& mat, const Eigen::VectorXd& vect) 
{ 
    mat = mat * vect.asDiagonal(); 
} 
+0

在数学上你是正确的,但是:1)对角线矩阵的额外存储器分配是昂贵的(所有这些零)2)矩阵乘法是昂贵的操作,然后元素乘法(因为缓存未命中)。所以我几乎肯定这会比我目前的方法慢 –

+0

Eigen不会分配一个完整的矩阵来存储对角线(事实上,该乘法不会发生任何分配)。这基本上优化为两个元素智能标量产品循环,如果可能的话,利用SIMD。 – chtz

+0

你说得对,这种方法很快。同时mat.array()。rowwise()* = vect.array();是相同还是稍快。并且比循环+ mkl v快15%?Mul –

2

最近期的处理器支持AVX技术。它提供了一个包含4个双精度(256位寄存器)的向量。因此,这种优化的解决方案可能会使用AVX。为此,我使用x86intrin.h库实现它,它是GCC编译器的一部分。我还使用OpenMP使解决方案成为多线程。

//gcc -Wall -fopenmp -O2 -march=native -o "MatrixVectorMultiplication" "MatrixVectorMultiplication.c" 
//gcc 7.2, Skylake Corei7-6700 HQ 
//The performance improvement is significant (5232 Cycle in my machine) but MKL is not available to test 
#include <stdio.h> 
#include <x86intrin.h> 
double A[20][1024] __attribute__((aligned(32))) = {{1.0, 2.0, 3.0, 3.5, 1.0, 2.0, 3.0, 3.5}, {4.0, 5.0, 6.0, 6.5,4.0, 5.0, 6.0, 6.5},{7.0, 8.0, 9.0, 9.5, 4.0, 5.0, 6.0, 6.5 }};//The 32 is for 256-bit registers of AVX 
double B[1024] __attribute__((aligned(32))) = {2.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 3.0 }; //the vector 
double C[20][1024] __attribute__((aligned(32)));//the results are stored here 

int main() 
{ 
    int i,j; 
    __m256d vec_C1, vec_C2, vec_C3, vec_C4; 

    //begin_rdtsc 
    //get the start time here 
    #pragma omp parallel for 
    for(i=0; i<20;i++){ 
     for(j=0; j<1024; j+=16){ 

      vec_C1 = _mm256_mul_pd(_mm256_load_pd(&A[i][j]), _mm256_load_pd(&B[j])); 
      _mm256_store_pd(&C[i][j], vec_C1); 

      vec_C2 = _mm256_mul_pd(_mm256_load_pd(&A[i][j+4]), _mm256_load_pd(&B[j+4])); 
      _mm256_store_pd(&C[i][j+4], vec_C2); 

      vec_C3 = _mm256_mul_pd(_mm256_load_pd(&A[i][j+8]), _mm256_load_pd(&B[j+8])); 
      _mm256_store_pd(&C[i][j+8], vec_C3); 

      vec_C4 = _mm256_mul_pd(_mm256_load_pd(&A[i][j+12]), _mm256_load_pd(&B[j+12])); 
      _mm256_store_pd(&C[i][j+12], vec_C4); 

     } 
    } 
    //end_rdtsc 
    //calculate the elapsead time 

    //print the results 
    for(i=0; i<20;i++){ 
     for(j=0; j<1024; j++){ 
      //printf(" %lf", C[i][j]); 
     } 
     //printf("\n"); 
    } 

    return 0; 
}