2012-07-26 96 views
9

我使用boost::asio::io_service作为基本线程池。一些线程被添加到io_service中,主线程开始发布处理程序,工作线程开始运行处理程序,并且所有事情都完成了。到现在为止还挺好;我通过单线程代码获得了很好的加速效果。使用Boost Asio设置邮政队列大小限制?

但是,主线程有数以百万计的事情发布。它只是继续发布它们,比工作线程处理它们要快得多。我没有达到内存限制,但将这么多东西排入队伍仍然很愚蠢。我想要做的是为处理程序队列设置固定大小,如果队列已满,则使用post()块。

我在Boost ASIO文档中看不到任何选项。这可能吗?

回答

0

你可以使用strand对象来放置事件并延迟你的main?所有工作发布后,您的计划是否会退出?如果是这样,你可以使用工作对象,这将让你更好地控制你的io_service何时停止。

你总是可以主要检查线程的状态,并让它等待,直到一个变得自由或类似的东西。

//链接

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link 
boost::asio::io_service io_service; 
boost::asio::io_service::work work(io_service); 

希望这会有所帮助。

+0

问题不在于'io_service'在完成工作之前停止---我们知道删除'work'对象以使'io_service'正常停止。问题在于'io_service'允许累积太多的任务。我们希望以一种不涉及创建任务的线程轮询的方式来限制未分配的任务的数量,因此我们关于是否可以阻止“poll()”的问题。 – uckelman 2012-07-28 21:34:17

2

我正在使用信号量修复处理程序队列大小。下面的代码说明了此解决方案:

void Schedule(boost::function<void()> function) 
{ 
    semaphore.wait(); 
    io_service.post(boost::bind(&TaskWrapper, function)); 
} 

void TaskWrapper(boost::function<void()> &function) 
{ 
    function(); 
    semaphore.post(); 
} 
1

你可以用你的拉姆达在另一个lambda这将需要数着“正在进行”任务的照顾,然后如果有太多正在进行的任务发布前等待。

例子:

#include <atomic> 
#include <chrono> 
#include <future> 
#include <iostream> 
#include <mutex> 
#include <thread> 
#include <vector> 
#include <boost/asio.hpp> 

class ThreadPool { 
    using asio_worker = std::unique_ptr<boost::asio::io_service::work>; 
    boost::asio::io_service service; 
    asio_worker service_worker; 
    std::vector<std::thread> grp; 
    std::atomic<int> inProgress = 0; 
    std::mutex mtx; 
    std::condition_variable busy; 
public: 
    ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) { 
    for (int i = 0; i < threads; ++i) { 
     grp.emplace_back([this] { service.run(); }); 
    } 
    } 

    template<typename F> 
    void enqueue(F && f) { 
    std::unique_lock<std::mutex> lock(mtx); 
    // limit queue depth = number of threads 
    while (inProgress >= grp.size()) { 
     busy.wait(lock); 
    } 
    inProgress++; 
    service.post([this, f = std::forward<F>(f)]{ 
     try { 
     f(); 
     } 
     catch (...) { 
     inProgress--; 
     busy.notify_one(); 
     throw; 
     } 
     inProgress--; 
     busy.notify_one(); 
    }); 
    } 

    ~ThreadPool() { 
    service_worker.reset(); 
    for (auto& t : grp) 
     if (t.joinable()) 
     t.join(); 
    service.stop(); 
    } 
}; 

int main() { 
    std::unique_ptr<ThreadPool> pool(new ThreadPool(4)); 
    for (int i = 1; i <= 20; ++i) { 
    pool->enqueue([i] { 
     std::string s("Hello from task "); 
     s += std::to_string(i) + "\n"; 
     std::cout << s; 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 
    }); 
    } 
    std::cout << "All tasks queued.\n"; 
    pool.reset(); // wait for all tasks to complete 
    std::cout << "Done.\n"; 
} 

输出:

Hello from task 3 
Hello from task 4 
Hello from task 2 
Hello from task 1 
Hello from task 5 
Hello from task 7 
Hello from task 6 
Hello from task 8 
Hello from task 9 
Hello from task 10 
Hello from task 11 
Hello from task 12 
Hello from task 13 
Hello from task 14 
Hello from task 15 
Hello from task 16 
Hello from task 17 
Hello from task 18 
All tasks queued. 
Hello from task 19 
Hello from task 20 
Done. 
0

也许尝试降低主线程的优先级,以便一旦工作线程忙碌起来,他们挨饿主线程和系统的自我限制。