2012-02-15 213 views
2

对于p2p应用程序的异步网络编程工作,我遇到了麻烦。 我的应用程序必须是服务器和客户端。当服务器收到 请求时,它必须将其广播到其他服务器的k。我认为boost :: asio示例的HTTP Server 3 Example可以很好地工作,并在其中实现异步客户端(作为类)。 上述(从升压:: ASIO客户端的例子),客户端类如下:客户端的C++ boost/asio服务器

ClientIO::ClientIO(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator) 
: _io_service(io_service), 
     strand_(io_service), 
     resolver_(io_service), 
    socket_(io_service) 
    { 
    tcp::endpoint endpoint = *endpoint_iterator; 
     socket_.async_connect(endpoint, 
     boost::bind(&ClientIO::handle_after_connect, this, 
     boost::asio::placeholders::error, ++endpoint_iterator)); 
    } 

    void ClientIO::write(G3P mex) 
    { 
    _io_service.post(boost::bind(&ClientIO::writeMessage, this, mex)); 
    } 

    void ClientIO::writeMessage(G3P mex) 
    { 
    bool write_in_progress = !messages_queue_.empty(); 
    messages_queue_.push_back(mex); 
    if (!write_in_progress) 
    { 
    char* message=NULL; 
    boost::system::error_code ec; 
    if (messages_queue_.front().opcode == DATA) 
    { 
     message=(char*)malloc((10800)*sizeof(char)); 
    } 
    else 
     message=(char*)malloc(1024*sizeof(char)); 

    boost::asio::streambuf request; 
    std::ostream request_stream(&request); 
    serializeMessage(message, messages_queue_.front()); 
    request_stream << message; 
    boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)), 
    strand_.wrap(
    boost::bind(&ClientIO::handle_after_write, this, 
    boost::asio::placeholders::error))); 
     free(message); 
    } 
    } 

    void ClientIO::readMessage() 
    { 
boost::asio::async_read(socket_, data_, 
    boost::bind(&ClientIO::handle_after_read, this, 
boost::asio::placeholders::error, 
boost::asio::placeholders::bytes_transferred 
    )); 
    } 

    void ClientIO::stop() 
    { 
    socket_.shutdown(tcp::socket::shutdown_both); 
    socket_.close(); 
    } 

    void ClientIO::handle_after_connect(const boost::system::error_code& error, 
    tcp::resolver::iterator endpoint_iterator) 
    { 
    if (error) 
    { 
    if (endpoint_iterator != tcp::resolver::iterator()) 
    { 
     socket_.close(); 
     tcp::endpoint endpoint = *endpoint_iterator; 
     socket_.async_connect(endpoint, 
     boost::bind(&ClientIO::handle_after_connect,this, 
     boost::asio::placeholders::error, ++endpoint_iterator)); 
    } 
    } 
    else 
    { 
    } 
    } 

    void ClientIO::handle_after_read(const boost::system::error_code& error, std::size_t bytes_transferred) 
    { 
    if (bytes_transferred > 0) 
    { 
    std::istream response_stream(&data_); 
    std::string mex=""; 
    std::getline(response_stream, mex); 
    deserializeMessage(&reply_,mex); 
    if (reply_.opcode == REPL) 
    { 
     cout << "ack received" << endl; 
    } 
    } 
    if (error) 
    { 
    ERROR_MSG(error.message()); 
    } 
    } 

    void ClientIO::handle_after_write(const boost::system::error_code& error) 
    { 
    if (error) 
    { 
    //   ERROR_MSG("Error in write: " << error.message()); 
    } 
    else 
    { 
    messages_queue_.pop_front(); 
    if (!messages_queue_.empty()) 
    { 
     cout << "[w] handle after write" << endl; 
     char* message; 
     if (messages_queue_.front().opcode == DATA) 
     { 
    message=(char*)malloc((10800)*sizeof(char)); 
     } 
     else 
    message=(char*)malloc(1024*sizeof(char)); 
     boost::asio::streambuf request; 
     std::ostream request_stream(&request); 
     serializeMessage(message, messages_queue_.front()); 
     request_stream << message; 
     boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)), 
     strand_.wrap(
     boost::bind(&ClientIO::handle_after_write, this, 
     boost::asio::placeholders::error))); 
    } 

    boost::asio::async_read_until(socket_, data_,"\r\n", 
      strand_.wrap(
      boost::bind(&ClientIO::handle_after_read, this, 
     boost::asio::placeholders::error, 
     boost::asio::placeholders::bytes_transferred))); 
    } 
    } 

    ClientIO::~ClientIO() 
    { 
    cout << "service stopped" << endl; 
    } 

}

当一个新的请求是由服务器接收时,它启动一个新的数据管理类形式的连接和一些计算后,写一个队列 用上面的类中的其他服务器(这里只有一个),并在每一个写有对应的ACK

client --write-> server ---write->\ 
            |--server1 
       server <--ACK----</ 

为了实现这一目标,我创建了一个用一个io_service姿态(io_service_test),为类变量,随着数据管理构造以下实例它:

DataManagement::DataManagement(){ 
    tcp::resolver resolver(io_service_test); 
    tcp::resolver::query query(remotehost, remoteport); 
    tcp::resolver::iterator iterator = resolver.resolve(query); 
    cluster = new cluster_head::ClusterIO(io_service_test,iterator); 
    io_service_test.run_one(); 
} 

然后,在计算后,发送数据:

void DataManagement::sendTuple(. . .){ 

    . . . 
    io_service_test.reset(); 
    io_service_test.run(); 
    for (size_t i=0; i<ready_queue.size() ;i++) 
    { 
     cluster->write(fragTuple); 
    } 
} 

相对应的是相同的HTTP Proxy3将例子以相同的方式修改(没有客户端类)。问题是有时候一切都很好,有时会失败,我得到一个堆栈跟踪,有时它不会停止,甚至是分段错误。 我认为这个问题是关闭的io_service管理和类方法的生活,但我无法弄清楚。

  • 任何想法?
  • 你有适合这种情况的一些例子,还是一个实现它的虚拟类?
+0

你可以提供回溯? – tr9sh 2012-02-16 09:27:41

回答

1

简要回顾一下代码,我看到下面的问题。

  1. ClientIO::writeMessage方法发送错误信息给收件人,因为它
    • 分配内存消息
    • 呼叫boost::asio::async_write,这不发送任何数据,但只把一个内部ASIO的请求的队列中的请求,即该消息将在某个时间发送。 boost::asio::buffer不复制消息。它只存储对它的引用。
    • 来电free(message)。即分配给消息的存储器可以在执行排队写请求时被覆盖。
  2. 内存泄漏ClientIO::handle_after_write消息已分配但未释放。
  3. 方法的ClientIO::readMessage没有被strand_.wrap调用包装。

为避免出现问题#1和#2需要使用类似ASIO缓冲区示例的shared_const_buffer类。 要解决问题#3必须以与调用boost::asio::async_write时相同的方式使用strand_.wrap调用。

+0

谢谢@ megabyte1024 – 0x41ndrea 2012-02-24 13:55:26