2016-08-16 54 views
0

我有一个range-image并想将其转换为libpointmatcher point cloud。云是一个Eigen::Matrix,每行4行(x,y,z,1)和几列。 范围图像是包含范围值(z)的unsigned short*阵列和包含有关像素可见性信息的unsigned char*阵列。使用tbb从阵列中保留并行顺序选择

在串行,我的代码看起来是这样的:

//container to hold the data 
std::vector<Eigen::Vector4d> vec; 
vec.reserve(this->Height*this->Width); 

//contains information about pixel visibility 
unsigned char* mask_data = (unsigned char*)range_image.mask.ToPointer(); 
//contains the actual pixel data 
unsigned short* pixel_data = (unsigned short*)range_image.pixel.ToPointer(); 

for (int y =0;y < range_image.Height; y++) 
{ 
    for (int x = 0; x < range_image.Width; x++) 
    { 
     int index =x+y*range_image.Width; 
     if(*(mask_data+index) != 0) 
     {    
      vec.push_back(Eigen::Vector4d(x,y,(double)*(data+index),1)); 
     }    
    } 
} 
// libpointmatcher point cloud with size of visible pixel 
PM::Matrix features(4,vec.size()); 
PM::DataPoints::Labels featureLabels; 
featureLabels.resize(4); 
featureLabels[0] = PM::DataPoints::Label::Label("x"); 
featureLabels[1] = PM::DataPoints::Label::Label("y"); 
featureLabels[2] = PM::DataPoints::Label::Label("z"); 
featureLabels[3] = PM::DataPoints::Label::Label("pad"); 

//fill with data 
for(int i = 0; i<vec.size(); i++) 
{ 
    features.col(i) = vec[i]; 
} 

因为这个循环需要500ms的为84万点,那太慢的大图像。现在我的想法是将上面的代码集成到一个parallized函数中。问题是Eigen::Matrix不提供push_back功能,我不知道可见点的数量提前,我需要在正确的顺序点处理点云。

所以我需要一个并行算法从我的范围图像中提取可见的3D点并将它们按照正确的顺序插入到Eigen :: Matrix中。我正在与Microsoft Visual Studio 2012,我可以使用OpenMP 2.0TBB。我感谢所有帮助:)

UPDATE

由于拱D.罗宾逊suggeested我试过tbb::parallel_scan。我传递了掩码数组和一个双数组来保存三维坐标。输出数组的大小是输入数组的四倍,以存储均匀的3D数据(x,y,z,1)。然后,我将otput数组映射到Eigen :: Matrix。行的数量是固定的,并且cols来自parallel_scan的结果。

size_t vec_size = width*height; 
double* out = new double[vec_size * 4]; 
size_t m1 = Compress(mask, pixel, out, height, width, 
[](unsigned char x) {return x != 0; }); 
Map<MatrixXd> features(out, 4, m1); 

。下面是从operator()代码:

void operator()(const tbb::blocked_range2d<size_t, size_t>& r, Tag) { 
    // Use local variables instead of member fields inside the loop, 
    // to improve odds that values will be kept in registers. 
    size_t j = sum; 
    const unsigned char* m = in; 
    const unsigned short* p = in2; 
    T* values = out; 
    size_t yend = r.rows().end(); 
    for (size_t y = r.rows().begin(); y != yend; ++y) 
    { 
     size_t xend = r.cols().end(); 
     for (size_t x = r.cols().begin(); x != xend; ++x) 
     { 
      size_t index = x + y*width; 
      if (pred(m[index])) 
      { 
       if (Tag::is_final_scan()) 
       { 
        size_t idx = j*4; 
        values[idx] = (double)x; 
        values[idx + 1] = (double)y; 
        values[idx + 2] = p[index]; 
        values[idx + 3] = 1.0; 
       } 
       ++j; 
      } 
     } 
    } 
    sum = j; 
} 

我现在快4倍,然后串行版本。你对这种方法有什么看法?我错过了任何想法,并有改进?谢谢

+0

如果你需要的是的std :: copy_if的逻辑等价物,可以考虑使用TBB :: parallel_scan(https://software.intel.com /sites/default/files/bc/2b/parallel_scan.pdf)。 “最终扫描”阶段可以计算最终的目的地指数(作为“成功案例”指数的总和,并进行有条件的分配。) –

+0

@ArchD。Robison可以给我一个关于parallel_scan要求的代码示例(Body,reverse_join_assign)?我不知道该怎么做:/什么是最好的结构来持有指数,我如何将它们合并到最终的扫描中?请帮助我:) – PSchn

回答

1

下面是一个如何做类似std::copy_if usingtbb::parallel_scan的例子。关键的方法是operator(),通常每个子范围调用两次,一次用于预扫描,一次用于最终扫描。 (但是请注意,TBB在没有必要时会省略预扫描。)在这里,预扫描只是进行计数,最后的扫描完成最后的工作(其中包括重放计数)。有关这些方法的更多详细信息,请参阅https://software.intel.com/sites/default/files/bc/2b/parallel_scan.pdf。另一个很好的参考是https://www.cs.cmu.edu/~guyb/papers/Ble93.pdf,它显示了很多你可以用平行扫描(又名前缀总和)做的事情。

```

#include "tbb/parallel_scan.h" 
#include "tbb/blocked_range.h" 
#include <cstddef> 

template<typename T, typename Pred> 
class Body { 
    const T* const in; 
    T* const out; 
    Pred pred; 
    size_t sum; 
public: 
    Body(T* in_, T* out_, Pred pred_) : 
     in(in_), out(out_), pred(pred_), sum(0) 
    {} 
    size_t getSum() const {return sum;} 
    template<typename Tag> 
    void operator()(const tbb::blocked_range<size_t>& r, Tag) { 
     // Use local variables instead of member fields inside the loop, 
     // to improve odds that values will be kept in registers. 
     size_t j = sum; 
     const T* x = in; 
     T* y = out; 
     for(size_t i=r.begin(); i<r.end(); ++i) { 
      if(pred(x[i])) { 
       if(Tag::is_final_scan()) 
        y[j] = x[i]; 
       ++j; 
      } 
     } 
     sum = j; 
    } 
    // Splitting constructor used for parallel fork. 
    // Note that it's sum(0), not sum(b.sum), because this 
    // constructor will be used to compute a partial sum. 
    // Method reverse_join will put together the two sub-sums. 
    Body(Body& b, tbb::split) : 
     in(b.in), out(b.out), pred(b.pred), sum(0) 
    {} 
    // Join partial solutions computed by two Body objects. 
    // Arguments "this" and "a" correspond to the splitting 
    // constructor arguments "b" and "this". That's why 
    // it's called a reverse join. 
    void reverse_join(Body& a) { 
     sum += a.sum; 
    } 
    void assign(Body& b) {sum=b.sum;} 
}; 

// Copy to out each element of in that satisfies pred. 
// Return number of elements copied. 
template<typename T, typename Pred> 
size_t Compress(T* in, T* out, size_t n, Pred pred) { 
    Body<T,Pred> b(in,out,pred); 
    tbb::parallel_scan(tbb::blocked_range<size_t>(0,n), b); 
    return b.getSum(); 
} 

#include <cmath> 
#include <algorithm> 
#include <cassert> 

int main() { 
    const size_t n = 10000000; 
    float* a = new float[n]; 
    float* b = new float[n]; 
    float* c = new float[n]; 
    for(size_t i=0; i<n; ++i) 
     a[i] = std::cos(float(i)); 
    size_t m1 = Compress(a, b, n, [](float x) {return x<0;}); 
    size_t m2 = std::copy_if(a, a+n, c, [](float x) {return x<0;})-c; 
    assert(m1==m2); 
    for(size_t i=0; i<n; ++i) 
     assert(b[i]==c[i]); 
} 

```

+0

感谢您的回答。我会尽快尝试。你看了10000000的大小。你能说一些关于性价比较低的东西吗?像100万或只是100 thousend? – PSchn

+0

这一切都取决于'pred'的重量级操作对你的情况有多重要。如果它只有几个周期(例如整数“上的”<),那么扫描可能是“大核心”Xeons上的一个丢失命题,因为内存传输的开销会淹没计算。在骑士登陆时,可能会有一些加速的希望。如果你的“预测”是重量级的,你会有更好的运气。 –

0

为什么不检查条件*(m_maskData+index)==0之前m_features(0,index) = x;

+0

是的,我会做到这一点。问题是我不知道可见像素的数量,并且parallel_for没有按照特定的顺序执行。但我需要的可见像素的顺序与它们在图像中的顺序相同。 – PSchn