2017-02-16 36 views
0

我正在C++中进行多线程。这可能是非常标准的东西,但我似乎无法在任何地方找到它或知道任何关键术语来在线搜索它。如何搜索下一个可用的线程来计算

我想多次执行某种计算,但有多个线程。对于每次迭代的计算,我想找到下一个可用线程完成其前一次计算以进行下一次迭代。我不想按顺序循环线程,因为下一个被调用的线程可能还没有完成它的工作。

E.g. 假设我有一个int向量,并且我想总计有5个线程的总数。我有要更新的总额存储在某个地方和我目前正在使用的元素的数量。每个线程都会查看计数以查看下一个位置,然后获取该矢量值并将其添加到总和中。然后它返回寻找计数来做下一次迭代。因此,对于每次迭代,计数递增然后查找下一个可用线程(也许一个已经在等待计数;或者可能它们仍然忙于工作)来执行下一次迭代。我们不增加线程的数量,但是我希望能够以某种方式搜索第一个完成下一个计算的所有5个线程。

我将如何去编码这个。我所知道的每一种方式都涉及到通过线程进行循环,以至于我无法检查下一个可能出现故障的可用程序。

+0

为了记录,总结一个'vector'是通过工作者线程协调任务的一个可怕情况,这些线程急切地从一组共同的值中抽取出来;要做的工作量很小,同步的成本只能确保一次计算的每个值都很高。预先对数据进行分区更有意义,因为它消除了对同步的需求(除了在结合其结果之前等待所有线程完成),并且可以预测每个线程的数据访问模式(适用于任何内存系统预取启发式)。 – ShadowRanger

回答

0

在全局变量上使用semafor(或互斥体,总是混淆这两个),告诉你下一步是什么。只要您访问变量使得线程访问清除,semafor将锁定其他线程。

所以,假设你有一个X元素的数组。和全球名为nextfree女巫initalized为0,那么psudo代码是这样的:

while (1) 
{ 
    <lock semafor INT> 
    if (nextfree>=X) 
    { 
     <release semnafor INT> 
     <exit and terminate thread> 
    } 
    <Get the data based on "nextfree"> 
    nextfree++; 
    <release semafor INT> 

    <do your stuff withe the chunk you got> 
} 

这里的关键是,每个线程将会单独有semafor锁内EXLUSIVE访问数据结构和因此无论其他人在做什么,都可以访问下一个。 (其他线程必须等待,如果他们完成,而另一个线程正在获取下一个数据块。当你释放只有队列中的一个将获得访问权限,其余的将不得不等待。)

那里有些东西需要保护。如果您设法退出错误的位置(使用释放它)或创建死锁,Semafor可能会锁定您的系统。

0

这是一个线程池:

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 
struct thread_pool { 
    thread_pool(std::size_t n = 1) { start_thread(n); } 
    thread_pool(thread_pool&&) = delete; 
    thread_pool& operator=(thread_pool&&) = delete; 
    ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R()> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> run_task(F task) { 
    if (threads_active() >= total_threads()) { 
     start_thread(); 
    } 
    return queue_task(std::move(task)); 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::size_t threads_active() const { 
    return active; 
    } 
    std::size_t total_threads() const { 
    return threads.size(); 
    } 
    void clear_threads() { 
    terminate(); 
    threads.clear(); 
    } 
    void start_thread(std::size_t n = 1) { 
    while(n-->0) { 
     threads.push_back(
     std::async(std::launch::async, 
      [this]{ 
      while(auto task = tasks.pop_front()) { 
       ++active; 
       try{ 
       (*task)(); 
       } catch(...) { 
       --active; 
       throw; 
       } 
       --active; 
      } 
      } 
     ) 
    ); 
    } 
    } 
private: 
    std::vector<std::future<void>> threads; 
    threaded_queue<std::packaged_task<void()>> tasks; 
    std::atomic<std::size_t> active; 
}; 

你给它多线程或者在建或通过start_thread如何。

然后您queue_task。这会返回一个std::future,告诉你任务何时完成。

随着线程完成任务,他们去threaded_queue并寻找更多。

threaded_queue被销毁时,它会中止其中的所有数据。

thread_pool被销毁时,它会中止所有将来的任务,然后等待所有未完成的任务完成。

Live example