2017-08-11 84 views
2

我既不是C++专家,也不是并发编程。但是,我正在实现一个简单的推理算法,该算法需要检查许多独立的模型。可能的模型的数量是巨大的,所以我想同时检查它们。并行数组检查

为了使其尽可能简单,我将我原来的问题转化为一个非常简单的问题:如何确定数组是否包含非零值?一个简单的,连续的解决办法是这样的:

bool containsNonZero (int* arr, int len) { 
    for (int i = 0; i < len; ++i) 
     if (arr[i]) return true; 
    return false; 
} 

(注:在现实中,len个无法放入一个INT,但在我原来的问题,不存在阵列,只是众多组合我生成但不存储。)

但是,我需要一个并行(和高效)的实现。有吨=的std ::螺纹:: hardware_concurrency()线程搜索阵列(请注意, < < len个。如果LEN%T!= 0那么它不会使一个问题最后一个线程处理剩余的值)。因此,第一个线程将搜索从到len/t的索引,第二个线程将搜索从len/t到(2 * len)/ t等的索引。最后一个线程将搜索索引从((t-1)* len)/ t到len。如果线程发现非零值,则所有线程都将停止并返回真。否则,他们将等待其他人完成,如果所有线程都同意,将返回false

这似乎很容易,但我在网上找不到任何答案。任何C++版本都是受欢迎的,但我不想依赖任何第三方库。

回答

1

下面是怎么回事?

每个工人检查天气的范围内的元素是非零或如果原子标志设置(意思是其他线程已经找到它)。

以下是每个线程(他们每个人分配了differet范围)执行该功能

template<typename Iterator> 
static void any_of(Iterator & begin, Iterator& end, std::atomic<bool> & result) 
    { 
     for (const auto & it=begin; it!=end; ++it) 
     { 
      if (result || (*it)!=0) 
      { 
       result= true; 
       return; 
      } 
     } 

你可以称它为如下

size_t chunk_size = input.size()/num_threads; 
std::atomic<bool> result(false); 
std::vector<std::thread> threads; 
for (size_t i = 0; i < num_threads; ++i) 
{ 
    const auto & begin = input.begin() + i *chunk_size; 
    const auto & end = input.begin() + std::min((i+1) * chunk_size, input.size()); 
    threads.emplace_back(any_element_of,begin,end,result); 
} 

for (auto & thread : threads) 
    thread.join(); 

这点你可以放心地检查后return来检索你的结果。

请注意,通过将一元谓词函数传递给工作者以使其更一般化,可以轻松扩展此方法。

template<typename Iterator, typename Predicate> 
static void any_of(Iterator & begin, Iterator& end, Predicate pred, std::atomic<bool> & result) 
    { 
     for (const auto & it=begin; it!=end; ++it) 
     { 
      if (result || pred(*it)) 
      { 
       result= true; 
       return; 
      } 
     } 
+3

请注意,如果您检查该标志在每个迭代上,你再次有效序列化的算法,为访问做原子这里招致了一些同步开销(多少恰恰是高度依赖于平台的)。最好在每次* n *次迭代中检查标志,然后为你的平台选择一个相当大的* n *。 – ComicSansMS

+0

@Davide Spataro令人印象深刻!我已经尝试过fork()函数。但它是一个真正的麻烦制造者。是范围越野车的计算? 如果input.size()== 1001和num_threads == 4,那么我猜测上一个值没有被检查。如果我错了,请原谅我。 –

+0

@ComicSansMS这是一个非常好的观点。为每个线程选择(或随机生成)不同的_n_以避免可能的冲突是一个好主意吗? –

2

我试图扩大达维德斯帕塔罗的解决方案使用atomic_flag它“不同于标准::原子的所有专业,它是保证是无锁的”解决atomic<bool>同步的问题http://en.cppreference.com/w/cpp/atomic/atomic_flag

编辑: 不相关的前一个问题,但我已经基准哪种方法更快,什么是我的惊喜atomic<bool>有约100快于atomic_flag

基准测试结果:

num_threads:2 
400000001 iterations flag 
401386195 iterations flag 
atomic_flag : it took 24.1202 seconds. Result: 1 
400000001 iterations bool 
375842699 iterations bool 
atomic<bool>: it took 0.334785 seconds. Result: 1 
num_threads:3 
229922451 iterations flag 
229712046 iterations flag 
233333335 iterations flag 
atomic_flag : it took 21.5974 seconds. Result: 1 
219564626 iterations bool 
233333335 iterations bool 
196877803 iterations bool 
atomic<bool>: it took 0.200942 seconds. Result: 1 
num_threads:4 
151745683 iterations flag 
150000001 iterations flag 
148849108 iterations flag 
148933269 iterations flag 
atomic_flag : it took 18.6651 seconds. Result: 1 
150000001 iterations bool 
112825220 iterations bool 
151838008 iterations bool 
112857688 iterations bool 
atomic<bool>: it took 0.167048 seconds. Result: 1 

基准代码:

#include <thread> 
#include <atomic> 
#include <vector> 
#include <iostream> 
#include <algorithm> 



template<typename Iterator> 
static void any_of_flag(Iterator & begin, Iterator& end, std::atomic_flag & result) 
{ 
    int counter = 0; 
    for (auto it = begin; it != end; ++it) 
    { 
     counter++; 
     if (!result.test_and_set() || (*it) != 0) 
     { 
      result.clear(); 
      std::cout << counter << " iterations flag\n"; 
      return; 
     } 
    } 
} 
template<typename Iterator> 
static void any_of_atomic(Iterator & begin, Iterator& end, std::atomic<bool> & result) 
{ 
    int counter = 0; 
    for (auto it = begin; it != end; ++it) 
    { 
     counter++; 
     if (result || (*it) != 0) 
     { 
      result = true; 
      std::cout << counter << " iterations bool\n"; 
      return; 
     } 
    } 
} 

void test_atomic_flag(std::vector<int>& input, int num_threads) 
{ 

    using namespace std::chrono; 

    high_resolution_clock::time_point t1 = high_resolution_clock::now(); 


    size_t chunk_size = input.size()/num_threads; 
    std::atomic_flag result = ATOMIC_FLAG_INIT; 
    result.test_and_set(); 

    std::vector<std::thread> threads; 
    for (size_t i = 0; i < num_threads; ++i) 
    { 
     auto & begin = input.begin() + i *chunk_size; 
     auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size()); 
     // had to use lambda in VS 2017 
     threads.emplace_back([&begin, &end, &result] {any_of_flag(begin, end, result); }); 

    } 

    for (auto & thread : threads) 
     thread.join(); 

    bool hasNonZero = !result.test_and_set(); 


    high_resolution_clock::time_point t2 = high_resolution_clock::now(); 

    duration<double> time_span = duration_cast<duration<double>>(t2 - t1); 

    std::cout << "atomic_flag : it took " << time_span.count() << " seconds. Result: " << hasNonZero << std::endl; 
} 



void test_atomic_bool(std::vector<int>& input, int num_threads) 
{ 

    using namespace std::chrono; 

    high_resolution_clock::time_point t1 = high_resolution_clock::now(); 


    size_t chunk_size = input.size()/num_threads; 
    std::atomic<bool> result(false); 

    std::vector<std::thread> threads; 
    for (size_t i = 0; i < num_threads; ++i) 
    { 
     auto & begin = input.begin() + i *chunk_size; 
     auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size()); 
     // had to use lambda in VS 2017 
     threads.emplace_back([&begin, &end, &result] {any_of_atomic(begin, end, result); }); 

    } 

    for (auto & thread : threads) 
     thread.join(); 

    bool hasNonZero = result; 


    high_resolution_clock::time_point t2 = high_resolution_clock::now(); 

    duration<double> time_span = duration_cast<duration<double>>(t2 - t1); 

    std::cout << "atomic<bool>: it took " << time_span.count() << " seconds. Result: " << hasNonZero << std::endl; 
} 

int main() 
{ 
    std::vector<int> input(1e9, 0); 
    input[1e9 - 1e8] = 1; 
    for (int num_threads : {2, 3, 4}) 
    { 
     std::cout << "num_threads:" << num_threads << std::endl; 
     test_atomic_flag(input, num_threads); 
     test_atomic_bool(input, num_threads); 
    } 

    int q; 
    std::cin >> q; 
    return 0; 
}; 

旧文章: 我有一些问题,迭代器的常量性和安放线程,但核心的变化,那就是atomic_flag的使用似乎上班。它不会立即停止所有线程,但在最坏的情况下,每次迭代只会有一个线程(因为每次迭代只有一个线程会知道它应该由于清除标志而停止)。

#include <thread> 
#include <atomic> 
#include <vector> 
#include <iostream> 
#include <algorithm> 

template<typename Iterator> 
static void any_of(Iterator & begin, Iterator& end, std::atomic_flag & result) 
{ 
    for (auto it = begin; it != end; ++it) 
    { 
     if (!result.test_and_set() || (*it) != 0) 
     { 
      result.clear(); 
      return; 
     } 
    } 
} 

int main() 
{ 
    int num_threads = 3; 
    std::vector<int> input = { 0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0, 1,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0}; 
    size_t chunk_size = input.size()/num_threads; 
    std::atomic_flag result = ATOMIC_FLAG_INIT; 
    result.test_and_set(); 

    std::vector<std::thread> threads; 
    for (size_t i = 0; i < num_threads; ++i) 
    { 
     auto & begin = input.begin() + i *chunk_size; 
     auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size()); 
     // had to use lambda in VS 2017 
     threads.emplace_back([&begin, &end, &result] {any_of(begin, end, result); }); 

    } 

    for (auto & thread : threads) 
     thread.join(); 

    bool hasNonZero = !result.test_and_set(); 
    return 0; 
}; 
+0

让我们来比较时间。使用更大的输入,像10e9元素。 'constexpr size_t lim = 10e9;向量 input(lim,0);'和不同的线程数,如果你的CPU至少有4个内核,则{{1,2,3,4}}。 –

+0

@DavideSpataro我已经做到了,对我来说令人惊讶的是,你的方法似乎要快得多。问题依然:为什么? – R2RT

+0

@ R2RT原子“原子”比“atomic_flag”更快可能是因为只能通过调用'test_and_set()'来从'atomic_flag'获得值。这是一种原子读取 - 修改 - 写入,与常规负载相比,它非常昂贵。 – LWimsey