2

我通过std::map必须循环,并且必须在每个迭代中完成任务具有以下属性:OpenMP的:通过循环“的std ::地图”基准(动态调度)

  1. 的工作量在每次迭代中变化;
  2. 线程之间不需要任何同步。

看起来像动态调度的完美场景,不是吗?

但是,非循环访问迭代器(例如std::map has)在涉及OpenMP的循环并行化方面令人头疼。对我来说,这种特殊代码的性能将是至关重要的,因此在寻找我创建了以下基准的最有效的解决方案:

#include <omp.h> 

#include <iostream> 
#include <map> 
#include <vector> 

#define COUNT 0x00006FFF 

#define UNUSED(variable) (void)(variable) 

using std::map; 
using std::vector; 

void test1(map<int, vector<int> >& m) { 
    double time = omp_get_wtime(); 

    map<int, vector<int> >::iterator iterator = m.begin(); 

#pragma omp parallel 
#pragma omp for schedule(dynamic, 1) nowait 
    for (size_t i = 0; i < m.size(); ++i) { 
    vector<int>* v; 
#pragma omp critical 
    v = &iterator->second; 

    for (size_t j = 0; j < v->size(); ++j) { 
     (*v)[j] = j; 
    } 

#pragma omp critical 
    iterator++; 
    } 

    printf("Test #1: %f s\n", (omp_get_wtime() - time)); 
} 

void test2(map<int, vector<int> >& m) { 
    double time = omp_get_wtime(); 

#pragma omp parallel 
    { 
    for (map<int, vector<int> >::iterator i = m.begin(); i != m.end(); ++i) { 
#pragma omp single nowait 
     { 
     vector<int>& v = i->second; 

     for (size_t j = 0; j < v.size(); ++j) { 
      v[j] = j; 
     } 
     } 
    } 
    } 

    printf("Test #2: %f s\n", (omp_get_wtime() - time)); 
} 

void test3(map<int, vector<int> >& m) { 
    double time = omp_get_wtime(); 

#pragma omp parallel 
    { 
    int thread_count = omp_get_num_threads(); 
    int thread_num = omp_get_thread_num(); 
    size_t chunk_size = m.size()/thread_count; 
    map<int, vector<int> >::iterator begin = m.begin(); 
    std::advance(begin, thread_num * chunk_size); 
    map<int, vector<int> >::iterator end = begin; 
    if (thread_num == thread_count - 1) 
     end = m.end(); 
    else 
     std::advance(end, chunk_size); 

    for (map<int, vector<int> >::iterator i = begin; i != end; ++i) { 
     vector<int>& v = i->second; 

     for (size_t j = 0; j < v.size(); ++j) { 
     v[j] = j; 
     } 
    } 
    } 

    printf("Test #3: %f s\n", (omp_get_wtime() - time)); 
} 

int main(int argc, char** argv) { 
    UNUSED(argc); 
    UNUSED(argv); 

    map<int, vector<int> > m; 

    for (int i = 0; i < COUNT; ++i) { 
    m[i] = vector<int>(i); 
    } 

    test1(m); 
    test2(m); 
    test3(m); 
} 

有3种,我能想出模仿可能的变体我的任务。代码非常简单并且可以说明问题,请看看它。我已经跑了测试多次,这里是我的结果:

Test #1: 0.169000 s 
Test #2: 0.203000 s 
Test #3: 0.194000 s 

Test #1: 0.167000 s 
Test #2: 0.203000 s 
Test #3: 0.191000 s 

Test #1: 0.182000 s 
Test #2: 0.202000 s 
Test #3: 0.197000 s 

Test #1: 0.167000 s 
Test #2: 0.187000 s 
Test #3: 0.211000 s 

Test #1: 0.168000 s 
Test #2: 0.195000 s 
Test #3: 0.192000 s 

Test #1: 0.166000 s 
Test #2: 0.197000 s 
Test #3: 0.199000 s 

Test #1: 0.184000 s 
Test #2: 0.198000 s 
Test #3: 0.199000 s 

Test #1: 0.167000 s 
Test #2: 0.202000 s 
Test #3: 0.207000 s 

我张贴这个问题,因为我发现这些结果奇特而万万想不到:

  1. 预期的测试#2是最快的,因为它不使用关键部分作为测试#1;
  2. 预期测试#3是最慢的,因为它没有真正利用动态调度,而是依赖作业的静态分配(手动完成);
  3. 可能从来没有预料到测试#2大致等于测试#3,有时甚至更糟。

的问题是:

  1. 我错过了什么?
  2. 你能解释一下测试结果吗?
  3. 您是否有更好的并行化思路?
+0

您是否尝试过使用unordered_map进行基准测试? – Leeor 2014-12-02 18:33:30

+0

@Leeor,我为什么?它在这种情况下不会改变。这个问题不是关于数据结构的圣战。它是关于并行化策略。最后,我对这个特定任务的'unordered_map'不感兴趣。 – 2014-12-02 18:38:03

回答

2
  • 你有并行这里更好的主意吗?
  • 您可能会尝试模仿schedule(static,1)的OpenMP的循环,即代替处理连续迭代的一大块,一个线程处理迭代与thread_count大步。下面是代码:

    void test4(map<int, vector<int> >& m) { 
        double time = omp_get_wtime(); 
    
    #pragma omp parallel 
        { 
        int thread_count = omp_get_num_threads(); 
        int thread_num = omp_get_thread_num(); 
        size_t map_size = m.size(); 
        map<int, vector<int> >::iterator it = m.begin(); 
        std::advance(it, thread_num); 
    
        for (int i = thread_num; i < map_size; i+=thread_count) { 
         vector<int>& v = it->second; 
    
         for (size_t j = 0; j < v.size(); ++j) { 
         v[j] = j; 
         } 
    
         if(i+thread_count < map_size) std::advance(it, thread_count); 
        } 
        } 
    
        printf("Test #4: %f s\n", (omp_get_wtime() - time)); 
    } 
    

    schedule(static,1)提供比schedule(static)更好的负载平衡的情况下,工作的量增加或减少在迭代空间。测试工作量就是这种情况。如果每次迭代的工作量是随机的,那么这两种策略应平均给出相同的平衡。

    另一个变体是在原子计数器的帮助下模仿schedule(dynamic)。代码:

    void test5(map<int, vector<int> >& m) { 
        double time = omp_get_wtime(); 
        int count = 0; 
    #pragma omp parallel shared(count) 
        { 
        int i; 
        int i_old = 0; 
        size_t map_size = m.size(); 
        map<int, vector<int> >::iterator it = m.begin(); 
    
    #pragma omp atomic capture 
        i = count++; 
    
        while (i < map_size) { 
         std::advance(it, i-i_old); 
         vector<int>& v = it->second; 
    
         for (size_t j = 0; j < v.size(); ++j) { 
         v[j] = j; 
         } 
    
         i_old = i; 
    #pragma omp atomic capture 
         i = count++; 
        } 
        } 
    
        printf("Test #5: %f s\n", (omp_get_wtime() - time)); 
    } 
    

    在一个循环中,线程决定它应该在地图上推进其本地迭代器的数量。线程首先自动递增计数器并取其先前的值,从而获得迭代索引,然后使迭代器前进到新索引与前一索引之间的差值。循环重复,直到计数器增加到地图大小以上。