2011-08-28 119 views
1

一般而言,我已经看到它是common way to create thread pools via "io_service + thread_group"。它非常适合const大小的线程池。或者只能变得更大的泳池。但我不知道如何使这样的游泳池变小,停止所有io_service?如何使boost :: thread_group变小boost :: asio :: io_service ::在其线程中运行?

因此,我们必须为shown

// class variables 
asio::io_service io_service; 
boost::thread_group threads; 
asio::io_service::work *work; 

// some pool init function 
work = new asio::io_service::work(io_service); 
int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

// and now we can simply post tasks 
io_service.post(boost::bind(&class_name::an_expensive_calculation, this, 42)); 
io_service.post(boost::bind(&class_name::a_long_running_task, this, 123)); 

// and it is really eazy to make pool biger - just call (mutexes may be required) 
threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

但是,如果我们想从我们的线程池中删除线程是什么?我们不能只是简单地致电threads.remove_thread(thread* thrd);,因为它不会停止运行它(IMHO),所以我想知道 - 是否有可能以及如何从这样的池中真正删除线程? (不只是打断tham,而是等到当前线程任务超出范围)?

更新:

下面是一些简单的编译代码:线程池,线程需要的续航时间。

#include <stdio.h> 
#include <iostream> 
#include <fstream> 

//Boost 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/locks.hpp> 

boost::asio::io_service io_service; 
boost::asio::io_service::work *work; 
boost::thread_group threads; 
boost::mutex threads_creation; 
int time_limit; 

int calculate_the_answer_to_life_the_universe_and_everything(int i) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(i)); 
    std::cout << i << std::endl; 
    return i; 
} 

void run(boost::shared_ptr<boost::thread> thread_ptr) 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception &e) 
    { 
     std::cout << "exeption: " << e.what() << std::endl; 
     boost::mutex::scoped_lock lock(threads_creation); 
     threads.remove_thread(thread_ptr.get()); 
     lock.unlock(); 
     std::cout << "thread removed from group" << std::endl; 
     return; 
    } 

} 

void pool_item(int i) 
{ 
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i)); 
    boost::unique_future<int> fi=pt.get_future(); 

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread 

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit))) 
    { 
     std::cout << "sucsess function returned: " << fi.get() << std::endl; 
    } 
    else 
    { 
     std::cout << "request took way 2 long!" << std::endl; 

     std::cout << "current group size:" << threads.size() << std::endl; 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 

     boost::mutex::scoped_lock lock(threads_creation); 
     threads.add_thread(thread.get()); 
     lock.unlock(); 

     task->join(); 

     throw std::runtime_error("killed joined thread"); 

    } 
} 

int main() 
{ 
    time_limit = 500; 

    work = new boost::asio::io_service::work(io_service); 
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    { 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 
     threads.add_thread(thread.get()); 
    } 

    int i = 800; 
    io_service.post(boost::bind(pool_item, i)); 

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2)); 
    std::cout << "thread should be removed by now." << std::endl 
     << "group size:" << threads.size() << std::endl; 

    std::cin.get(); 
    return 0; 
} 

正如你所看到的线程不从线程删除即使.remove_thread(ptr);调用池=(为什么

更新#2:?

那么什么方法,我结束了服装线组...

#include <stdio.h> 
#include <iostream> 
#include <fstream> 
#include <set> 

//Boost 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/locks.hpp> 

//cf service interface 
//#include <service.hpp> 

//cf-server 
//#include <server.h> 

#include <boost/foreach.hpp> 

class thread_group 
{ 
public: 
    void add(boost::shared_ptr<boost::thread> to_add) 
    { 
     boost::mutex::scoped_lock lock(m); 
     ds_.insert(to_add); 
    } 
    void remove(boost::shared_ptr<boost::thread> to_remove) 
    { 
     boost::mutex::scoped_lock lock(m); 
     ds_.erase(to_remove); 
    } 

    int size() 
    { 
     boost::mutex::scoped_lock lock(m); 
     return ds_.size(); 
    } 

    void join_all(boost::posix_time::milliseconds interuption_time=boost::posix_time::milliseconds(1000)) 
    { 
     boost::mutex::scoped_lock lock(m); 
     BOOST_FOREACH(boost::shared_ptr<boost::thread> t, ds_) 
     { 
      boost::thread interrupter(boost::bind(&thread_group::interupt_thread, this, t, interuption_time)); 
     } 
    } 

private: 
    std::set< boost::shared_ptr<boost::thread> > ds_; 
    boost::mutex m; 
    void interupt_thread(boost::shared_ptr<boost::thread> t, boost::posix_time::milliseconds interuption_time) 
    { 
     try 
     { 
      if(!t->timed_join(interuption_time)) 
       t->interrupt(); 

     } 
     catch(std::exception &e) 
     { 
     } 
    } 
}; 

boost::asio::io_service io_service; 
boost::asio::io_service::work *work; 
thread_group threads; 
int time_limit; 



int calculate_the_answer_to_life_the_universe_and_everything(int i) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(i)); 
    std::cout << i << std::endl; 
    return i; 
} 

void run(boost::shared_ptr<boost::thread> thread_ptr) 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception &e) 
    { 
     std::cout << "exeption: " << e.what() << std::endl; 
     threads.remove(thread_ptr); 
     std::cout << "thread removed from group" << std::endl; 
     return; 
    } 

} 

void pool_item(int i) 
{ 
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i)); 
    boost::unique_future<int> fi=pt.get_future(); 

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread 

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit))) 
    { 
     std::cout << "sucsess function returned: " << fi.get() << std::endl; 
    } 
    else 
    { 
     std::cout << "request took way 2 long!" << std::endl; 

     std::cout << "current group size:" << threads.size() << std::endl; 
     std::cout << "we want to add thread!" << std::endl; 
     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     threads.add(thread); 
     std::cout << "thread added" << std::endl 
      << "current group size:" << threads.size() << std::endl; 
     task->join(); 

     throw std::runtime_error("killed joined thread"); 

    } 
} 

int main() 
{ 
    time_limit = 500; 

    work = new boost::asio::io_service::work(io_service); 
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    { 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 
     threads.add(thread); 
    } 

    int i = 800; 
    io_service.post(boost::bind(pool_item, i)); 

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2)); 
    std::cout << "thread should be removed by now." << std::endl 
     << "group size:" << threads.size() << std::endl; 

    std::cin.get(); 
    return 0; 
} 

回答

3

我已经能够做到这一点在过去通过利用run()会在回调抛出异常时退出的事实。而不是直接在线程启动run()的,我称之为退出,如果适当的异常被抛出线程一个效用函数:

void RunIOService() 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception ex) 
    { 
    } 
} 

然后,所有你需要做的就是安排一个回调,这将抛出一个异常:

static void KillThreadCallback() 
{ 
    // throw some exception that you catch above 
} 

io_service.post(&KillThreadCallback); 

这将导致执行此回调退出,由1实质上降低线程池计数的大小使用此线程,你可以展开和很容易地收缩io_service线程池。其可用于干净地关闭I/O服务时(使用的C++ 0x lambda表达式)

+0

请参阅我的帖子更新。 – Rella

1

一种模式:

void ThreadLoop() 
{ 
    while(m_keepRunning) { 
     try { 
      io_service.run_one(); 
     } catch(const std::exception& e) { 
      // error handling 
     } 
    } 
} 

void Stop() 
{ 
    // Using C++0x lambdas 
    io_service.post([=]{ m_keepRunning = false; }); 
    // or 
    io_service.post(boost::bind(&ThisClass::StopCallback, this)); 
} 

void StopCallback() 
{ 
    m_keepRunning = false; 
} 

m_keepRunning是一个成员变量。只应在I/O服务线程中触及。