2016-03-06 62 views
2

我在C++中拥有以下代码。该代码是从C++ Concurrency In Action: Practical Multithreading在线程在C++中完成后向线程分配新任务

void do_work(unsigned id); 

void f() { 
    std::vector<std::thread> threads; 
    for(unsigned i = 0; i < 20; ++i) { 
     threads.push_back(std::thread(do_work, i)); 
    } 
    std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join)); 
} 

假设线程[0]已完成处理并返回一个值。我仍然有更多的文件要处理,现在想将这个新文件分配给完成的线程。我如何在C++中实现这种行为?或者我必须销毁线程,并在线程完成后创建一个新线程?但是,如何检查这些线程是否已完成?

+0

查找。您将需要更多级别的间接工作负载传递。 –

+0

您需要有一种方法将工作传递给您的线程(通过队列或其他方式)。这与你所展示的有很大不同。这对于proactor来说是一个很好的用例,就像boost :: asio一样 – Chad

回答

2

“我怎样才能在C++中实现这种行为”的简短答案就是编写代码来完成它。您已经确定自己的第一步是“如何检查这些线程是否已完成”。

有几种基本的方法。但是,它们都归结为同一件事情:不是让每个线程简单地消失,而是在每个线程终止之前通知父进程它已完成。

对于初学者来说,每个线程应该知道它是哪个线程。在你的例子中,所有线程都被放置在std::vector中,并且它们由向量的索引来标识。这不是唯一的方法。还有其他的方法来牧养所有的线索,但为了答案的目的,这将会做到。

然后,每个线程都需要通过将线程索引号作为线程参数传递来知道它是哪个索引。你的代码已经做了。精彩。

现在,只需关闭循环的结尾:你只需要实例化一个std::mutex,具有std::condition_variable,这是保护std::queuestd::list。或者,也许是整数的一个std::set。您可以自由决定哪个容器最适合您。

然后,每个线程终止之前,它:

  • 锁定互斥。

  • 将其线程索引放入容器中。

  • 表示条件变量。

  • 解锁互斥锁,然后立即返回,终止此线程。

然后,父线程,开始所有的线程:

  • 锁定互斥

  • 检查队列/套/不管是空的。如果是,它会等待条件变量,直到它不是。

  • 从队列/ set/whatever中删除线程索引,并加入该线程。该线程刚刚结束。现在你知道哪个线程被终止了,并且可以做任何你想要的信息。

  • 完成处理或重新启动线程后,它会再次检查队列是否为空。

1

下面是Sam Varshavchik解释的基本实现。

Live demo

为什么我添加了一个local_queue的原因是为了确保我们的m_Mutex解锁的时候了。如果将其删除,调用push_task的线程可能会被阻塞。

析构函数调用stop(),它设置m_Runningfalse,通知线程关于它,并等待它完成处理所有剩余的任务。

如果工人阶级死亡,线程也死了,这很好。

我的例子只是创建3个线程和每个线程for (int i = 0; i < 5; i++) 5个任务,主要是为了确保在ideone所有的输出显示,但我已经有10个线程和每个线程任务5000测试了它和它运行得很好。

do_work函数有两行,如果您希望输出流正确同步,您可以取消注释。 该课程有多线程支持。

可以stop()并重新start()线程多次,你在线程池实现像

class Worker 
{ 
public: 
    Worker(bool start) : m_Running(start) { if (start) private_start(); } 
    Worker() : m_Running(false) { } 
    ~Worker() { stop(); } 

    template<typename... Args> 
    void push_task(Args&&... args) 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      m_Queue.push_back(std::bind(std::forward<Args>(args)...)); 
     } 

     m_Condition.notify_all(); 
    } 

    void start() 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      if (m_Running == true) return; 
      m_Running = true; 
     } 

     private_start(); 
    } 

    void stop() 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      if (m_Running == false) return; 
      m_Running = false; 
     } 

     m_Condition.notify_all(); 
     m_Thread.join(); 
    } 

private: 
    void private_start() 
    { 
     m_Thread = std::thread([this] 
     { 
      for (;;) 
      { 
       decltype(m_Queue) local_queue; 
       { 
        std::unique_lock<std::mutex> lk(m_Mutex); 
        m_Condition.wait(lk, [&] { return !m_Queue.empty() + !m_Running; }); 

        if (!m_Running) 
        { 
         for (auto& func : m_Queue) 
          func(); 

         m_Queue.clear(); 
         return; 
        } 

        std::swap(m_Queue, local_queue); 
       } 

       for (auto& func : local_queue) 
        func(); 
      } 
     }); 
    } 

private: 
    std::condition_variable m_Condition; 
    std::list<std::function<void()>> m_Queue; 
    std::mutex m_Mutex; 
    std::thread m_Thread; 
    bool m_Running = false; 
}; 

void do_work(unsigned id) 
{ 
    //static std::mutex cout_mutex; 
    //std::lock_guard<std::mutex> lk(cout_mutex); 
    std::cout << id << std::endl; 
} 

int main() 
{ 
    { 
     Worker workers[3]; 
     int counter = 0; 

     for (auto& worker : workers) 
      worker.start(); 

     for (auto& worker : workers) 
     { 
      for (int i = 0; i < 5; i++) 
       worker.push_task(do_work, ++counter + i); 
     } 
    } 

    std::cout << "finish" << std::endl; 
    getchar(); 

    return 0; 
} 
相关问题