2013-03-06 53 views
2

我使用boost:asio与多个io_services保持不同形式的阻塞I/O分离。例如。我有一个用于阻止文件I/O的io_service,另一个用于长期运行的CPU绑定任务(这可以扩展到阻止网络I/O的第三个等等)。一般来说,我想确保一种形式的阻塞I/O不能让其他人饿死。保持两个交叉通信asio io_service对象忙

我遇到的问题是,因为在一个io_service中运行的任务可以将事件发布到其他io_service(例如,一个CPU绑定的任务可能需要启动文件I/O操作,或者完成的文件I/O操作可能调用一个CPU绑定的回调),我不知道如何让两个io_services都运行,直到它们都没有事件发生。

通常与单个I/O服务,你做这样的事情:

shared_ptr<asio::io_service> io_service (new asio::io_service); 
shared_ptr<asio::io_service::work> work (
    new asio::io_service::work(*io_service)); 

// Create worker thread(s) that call io_service->run() 

io_service->post(/* some event */); 

work.reset(); 

// Join worker thread(s) 

但是,如果我只是做这两个io_services,一进我没有张贴的初始事件立即结束。即使我向两者发布初始事件,如果io_service B上的初始事件在io_service A上的任务向B发布新事件之前完成,io_service B将提前结束。

我怎样才能保持io_service对象B上运行,而io_service对象A仍然处理事件(因为在服务质量上排队的事件之一可能会发布一个新的事件B),反之亦然,同时还要确保两个io_services退出了run()方法是否同时出现两个事件?

回答

3

具体任务想出一个办法做到这一点,所以记录它的情况下,任何人的记录发现在搜索这个问题:

  • 创建每个N跨通信io_services,创造他们每个人的工作对象,然后开始他们的工作线程。

  • 创建一个“主”io_service对象,它不会运行任何工作线程。

  • 不允许将事件直接发布到服务。相反,为io_services创建访问器函数,它将:

    1. 在主线程上创建一个工作对象。
    2. 将回调包装在运行真实回调的函数中,然后删除工作。
    3. 改为发布此包装回调。
  • 在执行中的主流,一旦所有的N个io_services的工作已经开始,您已发布的工作对他们中的至少一个,呼吁主io_service对象的run()。

  • 当主io_service的run()方法返回时,删除N个交叉通信io_services上的所有初始工作,并加入所有工作线程。

具有主io_service对象的线程在其他每个io_services的本职工作,确保他们不会停止,直到主io_service对象用完的工作。让每个其他io_services拥有主发送回调的主io_service的工作,以确保主io_service不会失去工作,直到其他io_services中的每个io_services都不再有任何待处理的回调回调。

一个例子(可以在一个类中enapsulated):

shared_ptr<boost::asio::io_service> master_io_service; 

void RunWorker(boost::shared_ptr<boost::asio::io_service> io_service) { 
    io_service->run(); 
} 

void RunCallbackAndDeleteWork(boost::function<void()> callback, 
           boost::asio::io_service::work* work) { 
    callback(); 
    delete work; 
} 

// All new posted callbacks must come through here, rather than being posted 
// directly to the io_service object. 
void PostToService(boost::shared_ptr<boost::asio::io_service> io_service, 
        boost::function<void()> callback) { 
    io_service->post(boost::bind(
     &RunCallbackAndDeleteWork, callback, 
     new boost::asio::io_service::work(*master_io_service))); 
} 

int main() { 
    vector<boost::shared_ptr<boost::asio::io_service> > io_services; 
    vector<boost::shared_ptr<boost::asio::io_service::work> > initial_work; 
    boost::thread_pool worker_threads; 

    master_io_service.reset(new boost::asio::io_service); 

    const int kNumServices = X; 
    const int kNumWorkersPerService = Y; 
    for (int i = 0; i < kNumServices; ++i) { 
    shared_ptr<boost::asio::io_service> io_service(new boost::asio::io_service); 
    io_services.push_back(io_service); 
    initial_work.push_back(new boost::asio::io_service::work(*io_service)); 

    for (int j = 0; j < kNumWorkersPerService; ++j) { 
     worker_threads.create_thread(boost::bind(&RunWorker, io_service)); 
    } 
    } 

    // Use PostToService to start initial task(s) on at least one of the services 

    master_io_service->run(); 

    // At this point, there is no real work left in the services, only the work 
    // objects in the initial_work vector. 
    initial_work.clear(); 
    worker_threads.join_all(); 
    return 0; 
} 
+0

+1好方案泰勒 – 2013-03-09 02:37:03

2

HTTP server example 2做了类似的事情,您可能会发现有用。它使用保留vectorio_service池的概念shared_ptr<boost::asio::io_service>shared_ptr<boost::asio::io_service::work>为每个io_service。它使用线程池来运行每个服务。

该示例使用轮询调度工作少量发放到I/O服务,我不认为这会在你的情况下适用,因为你有io_service A和B. io_service

+0

酷,我不知道io_service对象::工作。我总是在无穷远处安排一个计时器。 +1 – rhashimoto 2013-03-07 18:53:41

+0

很好的例子,但不是我所需要的,因为在那个例子中,一个服务的工作线程永远不会将事件发布到其他服务之一。 – 2013-03-08 18:14:04

相关问题