2014-09-21 142 views
1

我想要的是当一个消息队列接收到一个int N时,处理函数将在N秒后调用。下面是我的代码。boost asio deadline_timer async_wait(N秒)在N秒内两次导致操作取消

如果两个接近消息队列的持续时间秒数大于整数N,它将运行正常,但当两个接收消息队列之间的持续时间秒数小于N时,处理程序将在一个处理程序中输出“操作已取消”不是我想要的。

我会很感激任何帮助。

#include <boost/asio.hpp> 
#include <zmq.h> 
#include <boost/thread.hpp> 
#include <iostream> 

boost::asio::io_service io_service; 

void* context = zmq_ctx_new(); 
void* sock_pull = zmq_socket(context, ZMQ_PULL); 


void handler(const boost::system::error_code &ec) { 
    std::cout << "hello, world" << "\t" << ec.message() << std::endl; 
} 

void run() { 
    io_service.run(); 
} 

void thread_listener() { 

    int nRecv; 
    boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(0)); 
    while(true) { 
     zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0); 
     std::cout << nRecv << std::endl; 
     timer.expires_from_now(boost::posix_time::seconds(nRecv)); 
     timer.async_wait(handler); 
    } 

} 

int main(int argc, char* argv[]) { 

    boost::asio::io_service::work work(io_service); 

    zmq_bind(sock_pull, "tcp://*:60000"); 
    boost::thread tThread(thread_listener); 
    boost::thread tThreadRun(run); 
    tThread.join(); 
    tThreadRun.join(); 
    return 0; 

} 

回答

2

当你调用

timer.expires_from_now(boost::posix_time::seconds(nRecv)); 

此,as the documentation states,取消任何异步定时器悬而未决。

如果你想在给定的时间在飞行中有重叠的请求,一个计时器显然是不够的。幸运的是,在Asio中绑定的共享指针周围有一个众所周知的模式,可以用来模拟每个响应的“会话”。

说你定义会话包含它自己的私人定时器:

struct session : boost::enable_shared_from_this<session> { 
    session(boost::asio::io_service& svc, int N) : 
     timer(svc, boost::posix_time::seconds(N)) 
    { 
     // Note: shared_from_this is not allowed from ctor 
    } 

    void start() { 
     // it's critical that the completion handler is bound to a shared 
     // pointer so the handler keeps the session alive: 
     timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error)); 
    } 

    private: 
    void handler(const boost::system::error_code &ec) { 
     std::cout << "hello, world" << "\t" << ec.message() << std::endl; 
    } 

    boost::asio::deadline_timer timer; 
}; 

现在,它的琐碎,以取代使用硬编码的定时器实例代码:

timer.expires_from_now(boost::posix_time::seconds(nRecv)); 
timer.async_wait(handler); 

与会话开始:

boost::make_shared<session>(io_service, nRecv)->start(); 

一个完整的工作示例(带有适当的残存ZMQ东西):Live On Coliru

#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/enable_shared_from_this.hpp> 
#include <boost/make_shared.hpp> 
#include <iostream> 

boost::asio::io_service io_service; 

///////////////////////////////////////////////////////////////////////// 
// I love stubbing out stuff I don't want to install just to help others 
enum { ZMQ_PULL }; 
static void* zmq_ctx_new()   { return nullptr; } 
static void* zmq_socket(void*,int) { return nullptr; } 
static void zmq_bind(void*,char const*) {} 
static void zmq_recv(void*,int*data,size_t,int) 
{ 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(rand()%1000)); 
    *data = 2; 
} 
// End of stubs :) 
///////////////////////////////////////////////////////////////////////// 

void* context = zmq_ctx_new(); 
void* sock_pull = zmq_socket(context, ZMQ_PULL); 

struct session : boost::enable_shared_from_this<session> { 
    session(boost::asio::io_service& svc, int N) : 
     timer(svc, boost::posix_time::seconds(N)) 
    { 
     // Note: shared_from_this is not allowed from ctor 
    } 

    void start() { 
     // it's critical that the completion handler is bound to a shared 
     // pointer so the handler keeps the session alive: 
     timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error)); 
    } 

    ~session() { 
     std::cout << "bye (session end)\n"; 
    } 

    private: 
    void handler(const boost::system::error_code &ec) { 
     std::cout << "hello, world" << "\t" << ec.message() << std::endl; 
    } 

    boost::asio::deadline_timer timer; 
}; 

void run() { 
    io_service.run(); 
} 

void thread_listener() { 
    int nRecv = 0; 
    for(int n=0; n<4; ++n) { 
     zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0); 
     std::cout << nRecv << std::endl; 

     boost::make_shared<session>(io_service, nRecv)->start(); 
    } 
} 

int main() { 
    auto work = boost::make_shared<boost::asio::io_service::work>(io_service); 

    zmq_bind(sock_pull, "tcp://*:60000"); 
    boost::thread tThread(thread_listener); 
    boost::thread tThreadRun(run); 

    tThread.join(); 
    work.reset(); 

    tThreadRun.join(); 
} 
+0

新增一个完全工作实例(具有适当存根ZMQ东西):** [实时论Coliru](http://coliru.stacked-crooked.com/a/59174827831685f3)** – sehe 2014-09-21 20:57:32

+0

由于, 它有助于。编译时如果没有“--std = C++ 0x”标志,虽然auto被替换为boost :: smart_ptr ',但会发生错误。 /usr/local/include/boost/smart_ptr/make_shared_object.hpp(747):错误:没有构造函数实例“session :: session”匹配参数列表 – ZFY 2014-09-25 16:21:29

+0

还有另一种方法来实现调用处理程序每​​N秒,只需启动一个线程并休眠N秒钟然后调用处理程序,这与async_wait方法非常不同。您能否介绍一下更多关于这两种方式的信息,这应该是首选,非常感谢。 – ZFY 2014-09-25 16:28:51