2016-12-14 54 views
2

我想并行化一个循环(使用tbb),其中包含一些昂贵但可矢量化的迭代(随机扩展)。我的想法是缓冲这些并刷新缓冲区,只要它达到矢量大小。这样的缓冲区必须是线程本地的。例如,并行循环结束时使用TBB刷新线程本地缓冲区

// dummy for testing 
void do_vectorized_work(size_t k, size_t*indices) 
{} 
// dummy for testing 
bool requires_expensive_work(size_t k) 
{ return (k&7)==0; } 

struct buffer 
{ 
    size_t K=0, B[vector_size]; 
    void load(size_t i) 
    { 
    B[K++]=i; 
    if(K==vector_size) 
     flush(); 
    } 
    void flush() 
    { 
    do_vectorized_work(K,B); 
    K=0; 
    } 
}; 

void do_work_in_parallel(size_t N) 
{ 
    tbb::enumerable_thread_specific<buffer> tl_buffer; 

    tbb::parallel_for(size_t(0),N,[&](size_t i) 
    { 
    if(requires_expensive_work(i)) 
     tl_buffer.local().load(i); 
    }); 
} 

然而,这留下缓冲区非空的,所以我还是要最后一次刷新他们每个人的

for(auto&b:tl_buffer) 
    b.flush(); 

但这是串行!当然,我也可以尝试这样做并行

using tl_range = typename tbb::enumerable_thread_specific<buffer>::range_type; 
tbb::parallel_for(tl_buffer.range(),[](tl_range const&range) 
{ 
    for(auto r:range) 
    r->flush(); 
}); 

但我不知道这是有效的(因为只有尽可能多的缓冲区有线程)。我想知道是否有可能避免事件发生后的最后冲洗。即是否可以使用tbb::task s(替换tbb::parallel_for),以便每个线程的最终任务是刷新其缓冲区?

回答

1

它发生,我认为这可以通过减少解决。

struct buffer 
{ 
    std::size_t K=0, B[vector_size]; 
    void load(std::size_t i) 
    { 
    B[K++]=i; 
    if(K==vector_size) flush(); 
    } 
    void flush() 
    { 
    do_vectorized_work(K,B); 
    K=0; 
    } 
    buffer(buffer const&, tbb::split) 
    {} 
    void operator()(tbb::block_range<std::size_t> const&range) 
    { for(i:range) load(i); } 
    bool empty() 
    { return K==0; } 
    std::size_t pop() 
    { return K? B[--K] : 0; } 
    void join(buffer&rhs) 
    { while(!rhs.empty()) load(rhs.pop()); } 
}; 

void do_work_in_parallel(std::size_t N) 
{ 
    buffer buff; 
    tbb::parallel_reduce(tbb::block_range<std::size_t>(0,N,vector_size),buff); 
    if(!buff.empty()) 
    buff.flush(); 
} 
2

不,工作线程没有关于这个特定任务是否是给定工作的最后一项任务的完整信息(这是工作窃取如何工作的)。因此,不可能在parallel_for或调度程序本身上实现这样的功能。因此,我建议你采用你描述的这两种方法。

你可以对此做两件事。

  • 使其异步。即排队一个任务,这将使所有的东西都被刷新。这将有助于从主线程的热路径中删除此代码。如果在完成此任务时需要设置任何依赖项,请注意。
  • 使用tbb::task_scheduler_observer为了初始化线程特定的数据,并在线程关闭或没有工作时保持一段时间时懒惰地释放它。后者需要使用local observer feature,这尚未正式支持,但已经持续了几年。

例子:

#define TBB_PREVIEW_LOCAL_OBSERVER 1 
#include <tbb/tbb.h> 
#include <assert.h> 

typedef void * buffer_t; 
const static int bufsz = 1024; 
class thread_buffer_allocator: public tbb::task_scheduler_observer { 
    tbb::enumerable_thread_specific<buffer_t> _buf; 
public: 
    thread_buffer_allocator() 
    : tbb::task_scheduler_observer(/*local=*/ true) { 
    observe(true); // activate the observer 
    } 
    ~thread_buffer_allocator() { 
    observe(false); // deactivate the observer 
    for(auto &b : _buf) { 
     printf("destructor: cleared: %p\n", b); 
     free(b); 
    } 
    } 
    /*override*/ void on_scheduler_entry(bool worker) { 
    assert(_buf.local() == nullptr); 
    _buf.local() = malloc(bufsz); 
    printf("on entry: %p\n", _buf.local()); 
    } 
    /*override*/ void on_scheduler_exit(bool worker) { 
    printf("on exit\n"); 
    if(_buf.local()) { 
     printf("on exit: cleared %p\n", _buf.local()); 
     free(_buf.local()); 
     _buf.local() = nullptr; 
    } 
    } 
}; 

int main() { 
    thread_buffer_allocator buffers_scope; 
    tbb::parallel_for(0, 1024*1024*1024, [&](auto i){ 
    usleep(i%3); 
    }); 
    return 0; 
} 
+0

感谢您的支持。我不认为异步方法比我在OP中描述的尝试更好。使用'tbb :: task_scheduler_observer'的方法听起来很有趣。你可以使用代码片段概述这将如何工作? – Walter

+0

@Walter更新。虽然我只在线上编译器上尝试过,但是它并没有与当地观察员进行最近的TBB测试:http://coliru.stacked-crooked.com/a/11728cd935579cfe – Anton