2013-02-14 151 views
0

我使用boost 1.52.0 32位库与OpenSSL 32位库与非托管Visual C++ 2008为新客户端I正在写信与现有的服务器进行通信。我的测试机器使用Windows 8.我正在使用同步读取和写入。该代码内置于可从C#访问的DLL中,但所有的asio调用都是在使用boost :: thread_group创建的非托管线程上完成的。boost :: asio :: write似乎不工作,而boost :: asio :: read是优秀的

我发现的是,当一个同步读取正在等待数据时,在另一个线程中发生的同步写入看起来被阻塞,并且不会出去 - 至少通过我编码的方式。所以我的问题是 - 同步读取是否需要等待另一个线程中的数据才能完成同步写入?

我已经验证过,如果在另一个线程中没有挂起的读取操作,我可以成功写入数据。我通过在读取之前冻结读取的线程来做到这一点。编写线程然后写出一条消息。然后我解冻了读取的线程,并且它能够成功地从服务器读取关于发送的消息的响应。

以下方法由create_thread方法调用来处理从服务器读取消息脱丝:

void SSLSocket::ProcessServerRequests() 
{ 
    // This method is responsible for processing requests from a server. 
    Byte *pByte; 
    int ByteCount; 
    size_t BytesTransferred; 
    boost::system::error_code Err; 
    Byte* pReqBuf; 
    string s; 
    stringstream ss; 
    // 
    try 
    { 
     ss << "ProcessServerRequests: Worker thread: " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Enable the handlers for the handshaking. 
     IOService->run(); 
     // Wait for the handshake to be sucessfully completed. 
     do 
     { 
     Sleep(50); 
     } while (!HandShakeReady); 
     // 
     sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string(); 
     uiClientPort = pSocket->lowest_layer().remote_endpoint().port(); 
     ReqAlive = true; 
     // If the thread that handles sending msgs to all servers has not been created yet, then create that one. 
     // This thread is created just once to handle all outbound msgs to all servers. 
     WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread)); 
     // Loop until the user quits, or an error is detected. The read method should wait until there is something to read. 
     do 
     { 
     pReqBuf = BufMang.GetPtr(MsgLenBytes); 
     boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::asio::transfer_exactly(MsgLenBytes), Err); 
     if (Err) 
     { 
      s = Err.message(); 
      if ((s.find("short r")) == string::npos) 
      { 
       ss.str(""); 
       ss << "SSLSocket::ProcessServerRequests: read(1) error = " << Err.message() << "\n. Terminating.\n\n"; 
       Log.LogString(ss.str(), LogError); 
      } 
      Terminate(); 
      // Notify the client that an error has been encountered and the program needs to shut down. TBD. 
     } 
     else 
     { 
      // Get the number of bytes in the message. 
      pByte = pReqBuf; 
      B2I.B.B1 = *pByte++; 
      B2I.B.B2 = *pByte++; 
      B2I.B.B3 = *pByte++; 
      B2I.B.B4 = *pByte; 
      ByteCount = B2I.IntVal; 
      pReqBuf = BufMang.GetPtr(ByteCount); 
      // Do a synchronous read which will hang until the entire message is read off the wire. 
      BytesTransferred = boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, ByteCount), boost::asio::transfer_exactly(ByteCount), Err); 
      ss.str(""); 
      ss << "SSLSocket::ProcessServerRequests: # bytes rcvd = " << Logger::NumberToString(BytesTransferred).c_str() << " from "; 
      ss << sClientIp.c_str() << " : " << Logger::NumberToString(uiClientPort) << "\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pReqBuf, (int)BytesTransferred, DisplayInHex, LogDebug3); 
      if ((Err) || (ByteCount != BytesTransferred)) 
      { 
       if (Err) 
       { 
        ss.str(""); 
        ss << "ProcessServerRequests:read(2) error = " << Err.message() << "\n. Terminating.\n\n"; 
       } 
       else 
       { 
        ss.str(""); 
        ss << "ProcessServerRequests:read(3) error - BytesTransferred (" << Logger::NumberToString(BytesTransferred).c_str() << 
        ") != ByteCount (" << Logger::NumberToString(ByteCount).c_str() << "). Terminating.\n\n"; 
       } 
       Log.LogString(ss.str(), LogError); 
       Terminate(); 
       // Notify the client that an error has been encountered and the program needs to shut down. TBD. 
       break; 
      } 
      // Call the C# callback method that will handle the message. 
      Log.LogString("SSLSocket::ProcessServerRequests: sending msg to the C# client.\n\n", LogDebug2); 
      CallbackFunction(this, BytesTransferred, (void*)pReqBuf); 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::ProcessServerRequests: worker thread done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::ProcessServerRequests: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
    } 
} 

以下方法由create_thread方法调用来处理将消息发送到服务器:

void SSLSocket::SendWorkerThread() 
{ 
    // This method handles sending msgs to the server. It is called upon 1st time class initialization. 
    // 
    DWORD WaitResult; 
    Log.LogString("SSLSocket::SendWorkerThread: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo); 
    // Loop until the user quits, or an error of some sort is thrown. 
    try 
    { 
     do 
     { 
     // If there are one or more msgs that need to be sent to a server, then send them out. 
     if (SendMsgQ.Count() > 0) 
     { 
      Message* pMsg = SendMsgQ.Pop(); 
      // Byte* pBuf = pMsg->pBuf; 
      const Byte* pBuf = pMsg->pBuf; 
      SSLSocket* pSSL = pMsg->pSSL; 
      int BytesInMsg = pMsg->BytesInMsg; 
      boost::system::error_code Error; 
      unsigned int BytesTransferred = boost::asio::write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), Error); 
      string s = "SSLSocket::SendWorkerThread: # bytes sent = "; 
      s += Logger::NumberToString(BytesInMsg).c_str(); 
      s += "\n"; 
      Log.LogString(s, LogDebug2); 
      Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3); 
      if (Error) 
      { 
       Log.LogString("SSLSocket::SendWorkerThread: error sending message - " + Error.message() + "\n", LogError); 
      } 
     } 
     else 
     { 
      // Nothing to send, so go into a wait state. 
      WaitResult = WaitForSingleObject(hEvent, INFINITE); 
      if (WaitResult != 0L) 
      { 
       Log.LogString("SSLSocket::SendWorkerThread: WaitForSingleObject event error. Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError); 
      } 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::SendWorkerThread: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::SendWorkerThread: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
    } 
} 

因此,如果同步写入应该能够在另一个线程中同步读取挂起时执行,那么有人可以告诉我我的代码做错了什么。

+0

我刚刚发现问题的原因。在将调用从同步转换为异步之后,它仍然表现出同样的问题。事实证明,这是由于一个畸形的消息。服务器根本不会对格式错误的消息做出响应,我刚发现这些消息。所以,我猜测同步调用可能没问题,但现在我暂时将它保持异步。 – 2013-02-20 00:26:25

回答

1

Asio套接字不是线程安全的,因此您可能无法从不同的线程访问它。代替使用 使用async_readasync_write