2017-04-13 78 views
1

我想在同一时间使用Boost.Asio的链和优先包装。如何结合束包装和优先包装Boost Asio

之前,我写我的代码,我已经阅读了以下信息:

Boost asio priority and strand

boost::asio and Active Object

http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531

Why do I need strand per connection when using boost::asio?

我想使用的包装方法,因为我想使用各种异步API,如async_read,a sync_write和async_connect。 根据http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531,似乎可以合并优先包装和链包装。

所以我写了基于下面的示例代码:

http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp

这里是我的代码:

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     wrapped_handler(handler_priority_queue& q, int p, Handler h) 
      : queue_(q), priority_(p), handler_(std::move(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_PRIORITY 
       ) 
#endif 
#if ENABLE_STRAND 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

的包装由下列宏启用:

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

当两个宏都启用时,我得到以下结果T:

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
execute(3) 
execute(2) 
execute(1) 
execute(0) 
before run_one() 
before run_one() 
before run_one() 

我希望我得到了

[called] priority,thread_id 

输出

[called] 1,140512649541376 

,但我没有得到它。

看起来功能​​,function_()被调用,但不调用wrapped_handler::operator()。 (该功能​​从pq.execute_all();在我的代码调用。)

void execute() { 
    std::cout << "execute(" << priority_ << ")" << std::endl; 
    function_(); // It is called. 
} 

template <typename Handler> 
class wrapped_handler { 
public: 

    template <typename... Args> 
    void operator()(Args&&... args) { // It is NOT called 
     std::cout << "operator() " << std::endl; 
     handler_(std::forward<Args>(args)...); 
    } 

我跟踪的序列function_()后调用。

下列函数调用:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L94

在功能 bool strand_service::do_dispatch(implementation_type& impl, operation* op)

然后,操作op不叫,但被推入队列INT以下行:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L111

我不知道为什么function_()被调度到strand_service。我认为,链包装已经在以下点在我的代码unwraped:

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

如果我只启用了优先级的包装,我得到了以下的结果。 看来,这是按我的预期工作。

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
operator() 
[called] 4,140512649541376 
execute(3) 
operator() 
[called] 3,140512649541376 
execute(2) 
operator() 
[called] 2,140512649541376 
execute(1) 
operator() 
[called] 1,140512649541376 
execute(0) 
operator() 
[called] 0,140512649541376 
before run_one() 
before run_one() 
before run_one() 

如果我只启用了strand wrapper,我得到了以下结果。 看来,我也在按预期工作。

before run_one() 
[called] 0,140127385941760 
before poll_one() 
[called] 1,140127385941760 
[called] 2,140127385941760 
[called] 3,140127385941760 
[called] 4,140127385941760 
before execute_all() 
before run_one() 
before run_one() 
before run_one() 

任何想法?

回答

1

我解决了这个问题。

我不知道为什么function_()被调度到strand_service。我认为,链包装已经在以下点在我的代码unwraped:

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

参数f是原来的处理程序。这意味着优先级队列包裹和链处理程序。包装线在外面。所以当调用f时,它被调度到strand_service。这个过程发生在同一个strand_service中,所以处理程序不会被调用。

为了解决这个问题,添加h->handler_入优先级队列,而不是f如下:

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

handler_是类模板wrapped_handler的成员变量。它拥有未被包装的处理程序。

下面是完整的代码:

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     template <typename HandlerArg> 
     wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h) 
      : queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_STRAND 
       ) 
#endif 
#if ENABLE_PRIORITY 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

这里是输出:

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
[called] 4,139903315736320 
execute(3) 
[called] 3,139903315736320 
execute(2) 
[called] 2,139903315736320 
execute(1) 
[called] 1,139903315736320 
execute(0) 
[called] 0,139903315736320 
before run_one() 
before run_one() 
before run_one()