2017-10-06 10 views
1

我正在使用boost来实现TCP客户端和服务器。在客户端,我必须一个接一个地发送多个文件。我使用单独的机制来通知服务器文件传输。如果服务器准备好接收文件,它将响应客户端并启动传输。如何确保完成TCP文件传输(C++)

我定义异步处理程序写入数据,然后让OS通过调用io_service.run()照顾它。据我所知,io_service.run()阻塞,直到没有更多的处理程序被分派,但这并不意味着数据实际上在远程端收到了正确的?问题是在io_service.run()返回后,我启动下一次传输,但服务器没有收到第一个传输。

我需要实现某种外部机制在远程端通知数据已收到客户端还是我做错了什么?

客户实现:

#include "StdAfx.h" 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <boost/enable_shared_from_this.hpp> 
#include <boost/thread.hpp> 
#include "AsyncTCPClient.h" 


AsyncTCPClient::AsyncTCPClient(boost::asio::io_service& iIoService, const std::string& iServerIP, const std::string& iPath) 
    : mResolver(iIoService), mSocket(iIoService) 
{ 
    size_t wPos = iServerIP.find(':'); 
    if(wPos==std::string::npos) 
    { 
     return; 
    } 
    std::string wPortStr = iServerIP.substr(wPos + 1); 
    std::string wServerIP = iServerIP.substr(0, wPos); 

    mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate); 
    if(!mSourceFile) 
    { 
     LOG(LOGERROR) << "Failed to open file: " << iPath; 
     return; 
    } 
    size_t wFileSize = mSourceFile.tellg(); 
    mSourceFile.seekg(0); 
    std::ostream wRequestStream(&mRequest); 
    wRequestStream << iPath << "\n" << wFileSize << "\n\n"; 

    LOG(LOGINFO) << "File to transfer: " << iPath; 
    LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes"; 

    tcp::resolver::query wQuery(wServerIP, wPortStr); 
    mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator)); 

} 

AsyncTCPClient::~AsyncTCPClient() 
{ 
} 

void AsyncTCPClient::HandleResolve(const boost::system::error_code & iErr, tcp::resolver::iterator iEndpointIterator) 
{ 
    if(!iErr) 
    { 
     tcp::endpoint wEndpoint = *iEndpointIterator; 
     mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator)); 
    } 
    else 
    { 
     LOG(LOGERROR) << "Error: " << iErr.message(); 
    } 
} 

void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) 
{ 
    if(!iErr) 
    { 
     boost::asio::async_write(mSocket, mRequest, boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error)); 
    } 
    else if(iEndpointIterator != tcp::resolver::iterator()) 
    { 
     mSocket.close(); 
     tcp::endpoint wEndpoint = *iEndpointIterator; 
     mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator)); 
    } 
    else 
    { 
     LOG(LOGERROR) << "Error: " << iErr.message(); 
    } 
} 

void AsyncTCPClient::HandleWriteFile(const boost::system::error_code& iErr) 
{ 
    if(!iErr) 
    { 
     if(mSourceFile) 
     { 
      mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size()); 

      // EOF reached 
      if(mSourceFile.gcount() <= 0) 
      { 
       LOG(LOGINFO) << "File transfer done"; 
       return; 
      } 

      //LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << " bytes.\n"; 
      boost::asio::async_write(mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()), boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error)); 
     } 
     else 
     { 
      LOG(LOGINFO) << "File transfer done"; 
      return; 
     } 
    } 
    else 
    { 
     LOG(LOGERROR) << "Error value: " << iErr.value(); 
     LOG(LOGERROR) << "Error message: " << iErr.message(); 
     throw std::exception(); 
    } 
} 

Server实现:

#include "StdAfx.h" 
#include <boost/array.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 
#include <fstream> 
#include <boost/enable_shared_from_this.hpp> 
#include "AsyncTCPClient.h" 
#include "AsyncTCPServer.h" 
#include "Debug.h" 


AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath) 
    :mAcceptor(mIoService, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort), true) 
{ 
    mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath)); 
    mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection, boost::asio::placeholders::error)); 
    mIoService.run(); 
} 

AsyncTCPServer::~AsyncTCPServer() 
{ 
    mIoService.stop(); 
} 

void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code& iErr) 
{ 
    if (!iErr) 
    { 
     iCurConnection->Start(); 
    } 
    else 
    { 
     BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message(); 
    } 
} 

连接实现:

#include "StdAfx.h" 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 
#include <fstream> 
#include "Debug.h" 
#include "AsyncTCPConnection.h" 

AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service& iIoService, const std::string iFilePath) 
    : mSocket(iIoService), mFileSize(0), mFilePath(iFilePath) 
{ 
} 

AsyncTCPConnection::~AsyncTCPConnection() 
{ 
} 

void AsyncTCPConnection::Start() 
{ 
    LOG(LOGINFO) << "Start"; 
    async_read_until(mSocket, mRequestBuffer, "\n\n", boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
} 

void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code& iErr, std::size_t iBytesTransferred) 
{ 
    if(iErr) 
    { 
     return HandleError(__FUNCTION__, iErr); 
    } 
    LOG(LOGTRACE) << "(" << iBytesTransferred << ")" << ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size() << ", max_size = " << mRequestBuffer.max_size(); 

    std::istream wRequestStream(&mRequestBuffer); 
    std::string wFilePath; 
    wRequestStream >> wFilePath; 
    wRequestStream >> mFileSize; 
    wRequestStream.read(mBuffer.c_array(), 2); 

    mOutputFile.open(mFilePath, std::ios_base::binary); 

    if(!mOutputFile) 
    { 
     LOG(LOGERROR) << "Failed to open: " << wFilePath; 
     return; 
    } 
    do 
    { 
     wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size()); 
     LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes"; 
     mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount()); 
    } 
    while(wRequestStream.gcount() > 0); 
    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
} 

void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code& iErr, std::size_t iBytesTransferred) 
{ 
    if(iBytesTransferred>0) 
    { 
     mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred); 
     LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes"; 
     if (mOutputFile.tellp()>=(std::streamsize)mFileSize) 
     { 
      return; 
     } 
    } 
    if(iErr) 
    { 
     return HandleError(__FUNCTION__, iErr); 
    } 
    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
} 

void AsyncTCPConnection::HandleError(const std::string& function_name, const boost::system::error_code& err) 
{ 
    LOG(LOGERROR) << " in " << function_name <<" due to " << err <<" " << err.message(); 
} 

代码发送文件:

void SendFile(std::string iFilePath, std::string iServerIP) 
{ 
    static int wRetries = 0; 
    try 
    { 
     boost::asio::io_service wIoService; 
     LOG(LOGINFO) << "Sending data to: " << iServerIP; 
     LOG(LOGINFO) << "Filename is: " << iFilePath; 

     AsyncTCPClient client(wIoService, iServerIP, iFilePath); 
     wIoService.run(); 
     // here I want to make sure that the data got to the remote host 
     // it looks like wIoService.run() returns once bytes are written to the socket 

    } 
    catch(std::exception) 
    { 
     // retry 3 times in case something goes wrong 
     if(wRetries < 3) 
     { 
      wRetries++; 
      LOG(LOGWARNING) << "Problem sending file : " << iFilePath << " to address: " << iServerIP; 
      LOG(LOGWARNING) << "Retry #" << wRetries; 
      SendFile(iFilePath, iServerIP); 
     } 
     else 
     { 
      LOG(LOGERROR) << "Unable to send file: " << iFilePath << " to address: " << iServerIP; 
      wRetries = 0; 
      return; 
     } 
    } 
    wRetries = 0; 
} 
+0

如果你需要知道,如果/当远程方已收到和/或处理的数据,然后远程将不得不告诉你。 –

回答

0

你可以使用“的boost :: ASIO :: io_service对象::工作”继续,直至要关闭你的流程的IO服务线程活着。否则,当所有发布的任务完成时,io_service :: run将会返回。

http://www.boost.org/doc/libs/1_65_1/doc/html/boost_asio/reference/io_service__work.html

我不认为你会想退出,一遍又一遍地重新线程每次传输。

您可以使用条件变量的信号,当你想关闭io_service对象线程,然后销毁工作对象,或者只是简单地销毁工作对象。

至于当服务器收到你发送的一切认识。你可以在你的协议中设计一些东西,或者仅仅依靠TCP的保证方面。我建议读一下TCP和IO-Completion。

+0

谢谢,根据我的理解,我需要确保在应用程序级别正确接收数据。我会实施某种确认机制。 – ajora