0

我正在将 boost 1.52.0 32 位库与 OpenSSL 32 位库和非托管 Visual C++ 2008 一起用于我正在编写以与现有服务器通信的新客户端。我的测试机器使用的是 Windows 8。我使用的是同步读写。代码内置在一个 DLL 中,可以从 C# 访问,但所有 asio 调用都是在使用 boost::thread_group 创建的非托管线程上完成的。

我发现,当同步读取正在等待数据时,发生在另一个线程中的同步写入似乎被阻塞并且不会退出 - 至少以我编码的方式。所以我的问题是 - 当同步读取正在等待另一个线程中的数据时,是否应该能够完全执行同步写入?

我已经验证,当另一个线程中没有挂起的读取时,我可以成功写出数据。我通过在即将阅读之前冻结正在阅读的线程来做到这一点。然后写的线程写了一条消息。然后我解冻了读取线程,它能够成功地从服务器读取关于已发送消息的响应。

create_thread 方法调用以下方法来处理从服务器的线路读取消息:

void SSLSocket::ProcessServerRequests()
{
   // This method is responsible for processing requests from a server.
   Byte *pByte;
   int ByteCount;
   size_t BytesTransferred;
   boost::system::error_code Err;
   Byte* pReqBuf;
   string s;
   stringstream ss;
   //
   try
   {
      ss << "ProcessServerRequests: Worker thread: " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n";
      Log.LogString(ss.str(), LogInfo);
      // Enable the handlers for the handshaking.
      IOService->run();
      // Wait for the handshake to be sucessfully completed.
      do
      {
         Sleep(50);
      } while (!HandShakeReady);
      //
      sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string();
      uiClientPort = pSocket->lowest_layer().remote_endpoint().port();
      ReqAlive = true;
      // If the thread that handles sending msgs to all servers has not been created yet, then create that one.
      // This thread is created just once to handle all outbound msgs to all servers.
      WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread));
      // Loop until the user quits, or an error is detected.  The read method should wait until there is something to read.
      do
      {
         pReqBuf = BufMang.GetPtr(MsgLenBytes);
         boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::asio::transfer_exactly(MsgLenBytes), Err);
         if (Err)
         {
            s = Err.message();
            if ((s.find("short r")) == string::npos)
            {
               ss.str("");
               ss << "SSLSocket::ProcessServerRequests: read(1) error = " << Err.message() << "\n.  Terminating.\n\n";
               Log.LogString(ss.str(), LogError);
            }
            Terminate();
            // Notify the client that an error has been encountered and the program needs to shut down.  TBD.
         }
         else
         {
            // Get the number of bytes in the message.
            pByte = pReqBuf;
            B2I.B.B1 = *pByte++;
            B2I.B.B2 = *pByte++;
            B2I.B.B3 = *pByte++;
            B2I.B.B4 = *pByte;
            ByteCount = B2I.IntVal;
            pReqBuf = BufMang.GetPtr(ByteCount);
            // Do a synchronous read which will hang until the entire message is read off the wire.
            BytesTransferred = boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, ByteCount), boost::asio::transfer_exactly(ByteCount), Err);
            ss.str("");
            ss << "SSLSocket::ProcessServerRequests: # bytes rcvd = " << Logger::NumberToString(BytesTransferred).c_str() << " from ";
            ss << sClientIp.c_str() << " : " << Logger::NumberToString(uiClientPort) << "\n";
            Log.LogString(ss.str(), LogDebug2);
            Log.LogBuf(pReqBuf, (int)BytesTransferred, DisplayInHex, LogDebug3);
            if ((Err) || (ByteCount != BytesTransferred))
            {
               if (Err)
               {
                  ss.str("");
                  ss << "ProcessServerRequests:read(2) error = " << Err.message() << "\n.  Terminating.\n\n";
               }
               else
               {
                  ss.str("");
                  ss << "ProcessServerRequests:read(3) error - BytesTransferred (" << Logger::NumberToString(BytesTransferred).c_str() <<
                     ") != ByteCount (" << Logger::NumberToString(ByteCount).c_str() << ").  Terminating.\n\n";
               }
               Log.LogString(ss.str(), LogError);
               Terminate();
               // Notify the client that an error has been encountered and the program needs to shut down.  TBD.
               break;
            }
            // Call the C# callback method that will handle the message.
            Log.LogString("SSLSocket::ProcessServerRequests: sending msg to the C# client.\n\n", LogDebug2);
            CallbackFunction(this, BytesTransferred, (void*)pReqBuf);
         }
      } while (ReqAlive);
      Log.LogString("SSLSocket::ProcessServerRequests: worker thread done.\n", LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::ProcessServerRequests: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
   }
}

create_thread 方法调用以下方法来处理向服务器发送消息:

void SSLSocket::SendWorkerThread()
{
   // This method handles sending msgs to the server.  It is called upon 1st time class initialization.
   //
   DWORD WaitResult;
   Log.LogString("SSLSocket::SendWorkerThread: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo);
   // Loop until the user quits, or an error of some sort is thrown.
   try
   {
      do
      {
         // If there are one or more msgs that need to be sent to a server, then send them out.
         if (SendMsgQ.Count() > 0)
         {
            Message* pMsg = SendMsgQ.Pop();
            // Byte* pBuf = pMsg->pBuf;
            const Byte* pBuf = pMsg->pBuf;
            SSLSocket* pSSL = pMsg->pSSL;
            int BytesInMsg = pMsg->BytesInMsg;
            boost::system::error_code Error;
            unsigned int BytesTransferred = boost::asio::write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), Error);
            string s = "SSLSocket::SendWorkerThread: # bytes sent = ";
            s += Logger::NumberToString(BytesInMsg).c_str();
            s += "\n";
            Log.LogString(s, LogDebug2);
            Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3);
            if (Error)
            {
               Log.LogString("SSLSocket::SendWorkerThread: error sending message - " + Error.message() + "\n", LogError);
            }
         }
         else
         {
            // Nothing to send, so go into a wait state.
            WaitResult = WaitForSingleObject(hEvent, INFINITE);
            if (WaitResult != 0L)
            {
               Log.LogString("SSLSocket::SendWorkerThread: WaitForSingleObject event error.  Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError);
            }
         }
      } while (ReqAlive);
      Log.LogString("SSLSocket::SendWorkerThread: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::SendWorkerThread: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
   }
}

因此,如果在同步读取在另一个线程中挂起时应该能够执行同步写入,那么有人可以告诉我我的代码做错了什么。

4

1 回答 1

1

Asio 套接字不是线程安全的,因此您可能无法从不同的线程访问它。使用async_readandasync_write代替。

于 2013-02-15T11:09:38.147 回答