7

我想使用用于 GUI 的线程和用于某些套接字 IO 的工作线程来实现 Boost Asio 模式。

工作线程将用于boost::asio::io_service管理套接字客户端。套接字上的所有操作将仅由工作线程执行。

GUI 线程需要从工作线程发送和接收消息。

我不知道如何使用 Boost Asio 来实现这种模式。

我已经以标准的 Asio 方式实现了套接字通信(我io_service.run()从工作线程调用并使用async_read_some/ async_send)。我不需要strands,因为io_service.run()仅从工作线程调用。

现在我正在尝试添加跨线程消息队列。我该如何实施?

我也应该run来自io_serviceGUI 线程吗?

还是应该只使用strandswithpost将消息从 GUI 线程发布到工作线程(不调用io_service.run()io_service.poll_one()从 GUI 线程),并使用操作系统的 GUI 消息循环将消息从工作线程发布到 GUI 线程?

如果我也需要调用io_service.run()io_service.poll_one()从 GUI 线程调用,我是否需要strands在套接字操作上使用,因为它io_service是在两个线程之间共享的?

编辑:为了澄清我的问题,我想尽我所能,使用 Boost Asio 实现消息队列,仅当 Boost Asio 无法完成工作时才依赖其他库。

4

3 回答 3

10

消息传递是相当通用的。有多种方法可以解决问题,解决方案可能取决于所需的行为细节。例如,阻塞或非阻塞、控制内存分配、上下文等。

  • Boost.Lockfree为单/多消费者/生产者提供线程安全的无锁非阻塞队列。它往往非常适合事件循环,在这种循环中,消费者被阻塞、等待生产者发出同步构造信号并不理想。

    boost::lockfree::queue<message_type> worker_message_queue;
    
    void send_worker_message(const message_type& message)
    {
      // Add message to worker message queue.
      worker_message_queue.push(message);
    
      // Add work to worker_io_service that will process the queue.
      worker_io_service.post(&process_message); 
    }
    
    void process_message()
    {
      message_type message;
    
      // If the message was not retrieved, then return early.
      if (!worker_message_queue.pop(message)) return;
    
      ...
    }
    
  • 或者,Boost.Asio可以io_service用作队列。消息只需要绑定到指定的处理程序。

    void send_worker_message(const message_type& message)
    {
      // Add work to worker_io_service that will process the message.
      worker_io_service.post(boost::bind(&process_message, message)); 
    }
    
    void process_message(message_type& message)
    {
      ...
    }
    

评论表明,愿望不仅仅是信息传递。听起来好像最终目标是允许一个线程导致另一个线程调用任意函数。

如果是这种情况,请考虑:

  • 使用Boost.Signals2实现托管信号和插槽。这允许任意函数注册一个信号。
  • 使用 Boost.Asioio_service设置信号发射。如果 GUI 线程和工作线程都有自己的io_service,那么工作线程可以将处理程序发布到io_service将发出信号的 GUI 线程中。在 GUI 线程的主循环中,它将轮询io_service,发出信号,并导致从 GUI 线程的上下文中调用槽。

这是一个完整的示例,其中两个线程将消息(作为 an unsigned int)相互传递,并导致在另一个线程中调用任意函数。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/signals2.hpp>
#include <boost/thread.hpp>

/// @brief io_service dedicated to gui.
boost::asio::io_service gui_service;

/// @brief io_service dedicated to worker.
boost::asio::io_service worker_service;

/// @brief work to keep gui_service from stopping prematurely.
boost::optional<boost::asio::io_service::work> gui_work;

/// @brief hello slot.
void hello(int x)
{
  std::cout << "hello with " << x << " from thread " << 
               boost::this_thread::get_id() << std::endl;
}

/// @brief world slot.
void world(int x)
{
  std::cout << "world with " << x << " from thread " << 
               boost::this_thread::get_id() << std::endl;
}

/// @brief Type for signals.
typedef boost::signals2::signal<void (int)> signal_type;

void emit_then_notify_gui(signal_type& signal, unsigned int x);

/// @brief Emit signals then message worker.
void emit_then_notify_worker(signal_type& signal, unsigned int x)
{
  // Emit signal, causing registered slots to run within this thread.
  signal(x);

  // If x has been exhausted, then cause gui service to run out of work.
  if (!x)
  {
    gui_work = boost::none;
  }
  // Otherwise, post work into worker service.
  else
  {
    std::cout << "GUI thread: " << boost::this_thread::get_id() << 
                 " scheduling other thread to emit signals" << std::endl;
    worker_service.post(boost::bind(
        &emit_then_notify_gui,
        boost::ref(signal), --x));
  }  
}

/// @brief Emit signals then message worker.
void emit_then_notify_gui(signal_type& signal, unsigned int x)
{
  // Emit signal, causing registered slots to run within this thread.
  signal(x);

  // If x has been exhausted, then cause gui service to run out of work.
  if (!x)
  {
    gui_work = boost::none;
  }
  // Otherwise, post more work into gui.
  else
  {
    std::cout << "Worker thread: " << boost::this_thread::get_id() << 
                 " scheduling other thread to emit signals" << std::endl;
    gui_service.post(boost::bind(
        &emit_then_notify_worker,
        boost::ref(signal), --x));
  }  
}

void worker_main()
{
  std::cout << "Worker thread: " << boost::this_thread::get_id() << std::endl;
  worker_service.run();
}

int main()
{
  signal_type signal;

  // Connect slots to signal.
  signal.connect(&hello);
  signal.connect(&world);

  boost::optional<boost::asio::io_service::work> worker_work(
     boost::ref(worker_service));
  gui_work = boost::in_place(boost::ref(gui_service));

  std::cout << "GUI thread: " << boost::this_thread::get_id() << std::endl;

  // Spawn off worker thread.
  boost::thread worker_thread(&worker_main);

  // Add work to worker.
  worker_service.post(boost::bind(
      &emit_then_notify_gui,
      boost::ref(signal), 3));

  // Mocked up GUI main loop.
  while (!gui_service.stopped())
  {
    // Do other GUI actions.

    // Perform message processing.
    gui_service.poll_one();
  }

  // Cleanup.
  worker_work = boost::none;
  worker_thread.join();
}

及其输出:

GUI线程:b7f2f6d0
工作线程:b7f2eb90
你好,来自线程 b7f2eb90 的 3
来自线程 b7f2eb90 的 3 世界
工作线程:b7f2eb90 调度其他线程发出信号
你好,来自线程 b7f2f6d0 的 2
来自线程 b7f2f6d0 的 2 世界
GUI 线程:b7f2f6d0 调度其他线程发出信号
你好,来自线程 b7f2eb90 的 1
来自线程 b7f2eb90 的 1 世界
工作线程:b7f2eb90 调度其他线程发出信号
你好,来自线程 b7f2f6d0 的 0
来自线程 b7f2f6d0 的 0 世界
于 2013-08-01T16:40:56.050 回答
1

如果你只有一个工人,那么这很容易。

ASIO 的处理程序由调用io_service.run(). 在您的情况下,这意味着只有一个线程,即工作线程,可以执行回调处理程序。所以你不必担心这里的线程安全。

您的 GUI 线程,假设它可以访问一个套接字,可以boost::asio::async_write()毫无问题地调用。但是,回调处理程序将在工作线程中执行。

根据我的经验(诚然有限),我使用了这种模式:

  1. 业务逻辑线程(可能是您的 GUI 线程)可以通过调用轻松安排对其客户端之一的写入boost::asio::async_write():工作线程将处理它。
  2. 工作线程启动了一些boost::asio::async_read(),并且可能正在构建“业务逻辑包”。我在这里的意思是,它从原始数据构造有意义的消息(可以是自定义类的子类PacketEventw/e 你什么)。
  3. 当工作线程有足够的数据来构建这样的消息时,它会这样做,然后将其排入 GUI 线程将拉入的线程安全队列中。
  4. GUI(或业务逻辑)线程然后处理该消息。

如果不清楚/如果我能提供更多帮助,请告诉我。

于 2013-07-31T17:43:14.867 回答
1

我在 2+ 个线程之间交换消息的方式是使用像队列这样的容器并将它们存储在其中,然后使用事件通知工作线程唤醒并处理它们。这是一个例子:

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf)
{
   // This method creates a msg object and saves it in the SendMsgQ object.
   //
   Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf);
   SendMsgQ.Push(pMsg);
   // Signal the send worker thread to wake up and send the msg to the server.
   SetEvent(hEvent);
}

在头文件中:

std::queue<Message*> SendMsgQueue; // Queue of msgs to send to the server.

以上代码适用于 Microsoft VC++。如果您的开发环境不同,您可能必须使用不同的类或方法。但是,想法应该是一样的。

编辑 - 更完整的代码示例

#include "StdAfx.h"
#include "SSLSocket.h"

boost::shared_ptr< boost::asio::io_service > SSLSocket::IOService;
bool SSLSocket::LobbySocketOpen = false;
SSLSocket* SSLSocket::pSSLLobby = 0;
int SSLSocket::StaticInit = 0;
Callback SSLSocket::CallbackFunction;
BufferManagement SSLSocket::BufMang;
volatile bool SSLSocket::ReqAlive = true;
Logger SSLSocket::Log;
HANDLE SSLSocket::hEvent;
bool SSLSocket::DisplayInHex;
ConcurrentMsgQueue SSLSocket::SendMsgQ;
bool SSLSocket::RcvThreadCreated = 0;
BufferManagement* Message::pBufMang;
bool SSLSocket::ShuttingDown = false;
std::vector<SSLSocket *> SocketList;

SSLSocket::SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex,
   const LogLevel levelOfLog, const string& logFileName, const int bufMangLen) : pSocket(0)
{
   // SSLSocket Constructor.
   // If the static members have not been intialized yet, then initialize them.
   LockCode = new Lock();
   if (!StaticInit)
   {
      SocketList.push_back(this);
      DisplayInHex = displayInHex;
      BufMang.Init(bufMangLen);
      Message::SetBufMang(&BufMang);
      // This constructor enables logging according to the vars passed in.
      Log.Init(logToFile, logToConsole, levelOfLog, logFileName);
      StaticInit = 1;
      hEvent = CreateEvent(NULL, false, false, NULL);
      // Define the ASIO IO service object.
      // IOService = new boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service);
      boost::shared_ptr<boost::asio::io_service> IOServ(new boost::asio::io_service);
      IOService = IOServ;
      pSSLLobby = this;
   }
}

SSLSocket::~SSLSocket(void)
{
   if (pSocket)
      delete pSocket;
   if (--StaticInit == 0)
      CloseHandle(hEvent);
}

void SSLSocket::Connect(SSLSocket* psSLS, const string& serverPath, string& port)
{
   // Connects to the server.
   // serverPath - specifies the path to the server.  Can be either an ip address or url.
   // port - port server is listening on.
   //
   try
   {
      LockCode->Acquire(); // Single thread the code.
      // If the user has tried to connect before, then make sure everything is clean before trying to do so again.
      if (pSocket)
      {
         delete pSocket;
         pSocket = 0;
      }                                                                                                  
      // If serverPath is a URL, then resolve the address.
      if ((serverPath[0] < '0') || (serverPath[0] > '9')) // Assumes that the first char of the server path is not a number when resolving to an ip addr.
      {
         // Create the resolver and query objects to resolve the host name in serverPath to an ip address.
         boost::asio::ip::tcp::resolver resolver(*IOService);
         boost::asio::ip::tcp::resolver::query query(serverPath, port);
         boost::asio::ip::tcp::resolver::iterator EndpointIterator = resolver.resolve(query);
         // Set up an SSL context.
         boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client);
         // Specify to not verify the server certificiate right now.
         ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
         // Init the socket object used to initially communicate with the server.
         pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx);
         //
         // The thread we are on now, is most likely the user interface thread.  Create a thread to handle all incoming socket work messages.
         // Only one thread is created to handle the socket I/O reading and another thread is created to handle writing.
         if (!RcvThreadCreated)
         {
            WorkerThreads.create_thread(boost::bind(&SSLSocket::RcvWorkerThread, this));
            RcvThreadCreated = true;
            WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread, this));
         }
         // Try to connect to the server.  Note - add timeout logic at some point.
         boost::asio::async_connect(pSocket->lowest_layer(), EndpointIterator,
            boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error));
      }
      else
      {
         // serverPath is an ip address, so try to connect using that.
         //
         stringstream ss1;
         boost::system::error_code EC;
         ss1 << "SSLSocket::Connect: Preparing to connect to game server " << serverPath << " : " << port << ".\n";
         Log.LogString(ss1.str(), LogInfo);
         // Create an endpoint with the specified ip address.
         const boost::asio::ip::address IP(boost::asio::ip::address::from_string(serverPath));
         int iport = atoi(port.c_str());
         const boost::asio::ip::tcp::endpoint EP(IP, iport);
         // Set up an SSL context.
         boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client);
         // Specify to not verify the server certificiate right now.
         ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
         // Init the socket object used to initially communicate with the server.
         pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx);
         //
         // Try to connect to the server.  Note - add timeout logic at some point.
         pSocket->next_layer().connect(EP, EC);
         if (EC)
         {
            // Log an error.  This worker thread should exit gracefully after this.
            stringstream ss;
            ss << "SSLSocket::Connect: connect failed to " << sClientIp << " : " << uiClientPort << ".  Error: " << EC.message() + ".\n";
            Log.LogString(ss.str(), LogError);
         }
         stringstream ss;
         ss << "SSLSocket::Connect: Calling HandleConnect for game server " << serverPath << " : " << port << ".\n";
         Log.LogString(ss.str(), LogInfo);
         HandleConnect(EC);
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::Connect: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
   LockCode->Release();
}

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf)
{
   // This method creates a msg object and saves it in the SendMsgQ object.
   // sends the number of bytes specified by bytesInMsg in pBuf to the server.
   //
   Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf);
   SendMsgQ.Push(pMsg);
   // Signal the send worker thread to wake up and send the msg to the server.
   SetEvent(hEvent);
}


void SSLSocket::SendWorkerThread(SSLSocket* psSLS)
{
   // This thread method gets called to process the messages to be sent to the server.
   //
   // Since this has to be a static method, call a method on the class to handle server requests.
   psSLS->ProcessSendRequests();
}

void SSLSocket::ProcessSendRequests()
{
   // This method handles sending msgs to the server.
   //
   std::stringstream ss;
   DWORD WaitResult;
   Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo);
   // Loop until the user quits, or an error of some sort is thrown.
   try
   {
      do
      {
         // If there are one or more msgs that need to be sent to a server, then send them out.
         if (SendMsgQ.Count() > 0)
         {
            Message* pMsg = SendMsgQ.Front();
            SSLSocket* pSSL = pMsg->pSSL;
            SendMsgQ.Pop();
            const Byte* pBuf = pMsg->pBuf;
            const int BytesInMsg = pMsg->BytesInMsg;
            boost::system::error_code Error;
            LockCode->Acquire(); // Single thread the code.
            try
            {
               boost::asio::async_write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), boost::bind(&SSLSocket::HandleWrite, this,
                  boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
            }
            catch (std::exception& e)
            {
               stringstream ss;
               ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n";
               Log.LogString(ss.str(), LogError);
            }
            ss.str(std::string());
            ss << "SSLSocket::ProcessSendRequests: # bytes sent = " << BytesInMsg << "\n";
            Log.LogString(ss.str(), LogDebug2);
            Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3);
            LockCode->Release();
         }
         else
         {
            // Nothing to send, so go into a wait state.
            WaitResult = WaitForSingleObject(hEvent, INFINITE);
            if (WaitResult != 0L)
            {
               Log.LogString("SSLSocket::ProcessSendRequests: WaitForSingleObject event error.  Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError);
            }
         }
      } while (ReqAlive);
      Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleWrite(const boost::system::error_code& error, size_t bytesTransferred)
{
   // This method is called after a msg has been written out to the socket.  Nothing to do really since reading is handled by the HandleRead method.
   //
   std::stringstream ss;
   try
   {
      if (error)
      {
         ss << "SSLSocket::HandleWrite: failed - " << error.message() << ".\n";
         Log.LogString(ss.str(), LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::RcvWorkerThread(SSLSocket* psSLS)
{
   // This is the method that gets called when the receive thread is created by this class.
   // This thread method focuses on processing messages received from the server.
   //
   // Since this has to be a static method, call an instance method on the class to handle server requests.
   psSLS->InitAsynchIO();
}

void SSLSocket::InitAsynchIO()
{
   // This method is responsible for initiating asynch i/o.
   boost::system::error_code Err;
   string s;
   stringstream ss;
   //
   try
   {
      ss << "SSLSocket::InitAsynchIO: Worker thread - " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n";
      Log.LogString(ss.str(), LogInfo);
      // Enable the handlers for asynch i/o.  The thread will hang here until the stop method has been called or an error occurs.
      // Add a work object so the thread will be dedicated to handling asynch i/o.
      boost::asio::io_service::work work(*IOService);
      IOService->run();
      Log.LogString("SSLSocket::InitAsynchIO: receive worker thread done.\n", LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleConnect(const boost::system::error_code& error)
{
   // This method is called asynchronously when the server has responded to the connect request.
   std::stringstream ss;
   try
   {
      if (!error)
      {
         LockCode->Acquire(); // Single thread the code.
         pSocket->async_handshake(boost::asio::ssl::stream_base::client,
            boost::bind(&SSLSocket::HandleHandshake, this, boost::asio::placeholders::error));
         LockCode->Release();
         ss << "SSLSocket::HandleConnect: From worker thread " << Logger::NumberToString(boost::this_thread::get_id()) << ".\n";
         Log.LogString(ss.str(), LogInfo);
      }
      else
      {
         // Log an error.  This worker thread should exit gracefully after this.
         ss << "SSLSocket::HandleConnect: connect failed.  Error: " << error.message() + ".\n";
         Log.LogString(ss.str(), LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleHandshake(const boost::system::error_code& error)
{
   // This method is called asynchronously when the server has responded to the handshake request.
   std::stringstream ss;
   try
   {
      if (!error)
      {
         // Try to send the first message that the server is expecting.  This msg tells the server we want to connect.
         //
         unsigned char Msg[5] = {0x17, 0x00, 0x00, 0x00, 0x06};
         boost::system::error_code Err;
         //
         if (pSSLLobby == this)
            LobbySocketOpen = true;
         sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string();
         uiClientPort = pSocket->lowest_layer().remote_endpoint().port();
         ReqAlive = true;
         LockCode->Acquire(); // Single thread the code.
         int Count = boost::asio::write(*pSocket, boost::asio::buffer(Msg), boost::asio::transfer_exactly(5), Err);
         if (Err)
         {
            ss << "SSLSocket::HandleHandshake: write failed - " << error.message() << ".\n";
            Log.LogString(ss.str(), LogInfo);
         }
         HandleFirstWrite(Err, Count);
         LockCode->Release();
         ss.str("");
         ss << "SSLSocket::HandleHandshake: From worker thread " << boost::this_thread::get_id() << ".\n";
      }
      else
      {
         ss << "SSLSocket::HandleHandshake: failed - " << error.message() << ".\n";
         IOService->stop();
      }
      Log.LogString(ss.str(), LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleFirstWrite(const boost::system::error_code& error, size_t bytesTransferred)
{
   // This method is called after a msg has been written out to the socket.  This method is only called from HandleHandShake.
   std::stringstream ss;
   try
   {
      if (!error)
      {
         // Notify the UI that we are now connected.  Create a 6 byte msg for this.
         pDataBuf = BufMang.GetPtr(6);
         BYTE* p = pDataBuf;
         // Create msg type 500
         *p = 244;
         *++p = 1;
         CallbackFunction(this, 2, (void*)pDataBuf);
         // Get the 1st 4 bytes of the next msg, which is always the length of the msg.
         pDataBuf = BufMang.GetPtr(MsgLenBytes);
         try
         {
            boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, this,
               boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
         }
         catch (std::exception& e)
         {
            stringstream ss;
            ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n";
            Log.LogString(ss.str(), LogError);
            Stop();
         }
      }
      else
      {
         ss << "SSLSocket::HandleFirstWrite: failed - " << error.message() << ".\n";
         Log.LogString(ss.str(), LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleRead(const boost::system::error_code& error, size_t bytesTransferred)
{
   // This method is called to process an incoming message.
   //
   std::stringstream ss;
   int ByteCount;
   try
   {
      // ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << ".\n";
      // Log.LogString(ss.str(), LogInfo);
      // Set to exit this thread if the user is done.
      if (!ReqAlive)
      {
         // IOService->stop();
         return;
      }
      if (!error)
      {
         // Get the number of bytes in the message.
         if (bytesTransferred == 4)
         {
            ByteCount = BytesToInt(pDataBuf);
         }
         else
         {
            // Call the C# callback method that will handle the message.
            ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << "; # bytes transferred = " << bytesTransferred << ".\n";
            Log.LogString(ss.str(), LogDebug2);
            if (bytesTransferred > 0)
            {
               Log.LogBuf(pDataBuf, (int)bytesTransferred, true, LogDebug3);
               Log.LogString("SSLSocket::HandleRead: sending msg to the C# client.\n\n", LogDebug2);
               CallbackFunction(this, bytesTransferred, (void*)pDataBuf);
            }
            else
            {
               // # of bytes transferred = 0.  Don't do anything.
               bytesTransferred = 0; // For debugging.
            }
            // Prepare to read in the next message length.
            ByteCount = MsgLenBytes;
         }
         pDataBuf = BufMang.GetPtr(ByteCount);
         boost::system::error_code Err;
         try
         {
            boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead,
               this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
         }
         catch (std::exception& e)
         {
            stringstream ss;
            ss << "SSLSocket::HandleRead: threw this error - " << e.what() << ".\n";
            Log.LogString(ss.str(), LogError);
         }
      }
      else
      {
         Log.LogString("SSLSocket::HandleRead failed: " + error.message() + "\n", LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleRead: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::Stop()
{
   // This method calls the shutdown method on the socket in order to stop reads or writes that might be going on.  If this is not done, then an exception will be thrown
   // when it comes time to delete this object.
   //
   boost::system::error_code EC;
   try
   {
      // This method can be called from the handler as well.  So once the ShuttingDown flag is set, don't go throught the same code again.
      if (ShuttingDown)
         return;
      LockCode->Acquire(); // Single thread the code.
      if (!ShuttingDown)
      {
         ShuttingDown = true;
         pSocket->next_layer().cancel();
         pSocket->shutdown(EC);
         if (EC)
         {
            stringstream ss;
            ss << "SSLSocket::Stop: socket shutdown error - " << EC.message() << ".\n";
         }
         else
         {
            pSocket->next_layer().close();
         }
         delete pSocket;
         pSocket = 0;
         ReqAlive = false;
         SetEvent(hEvent);
         IOService->stop();
         LobbySocketOpen = false;
         WorkerThreads.join_all();
      }
      LockCode->Release();
      delete LockCode;
      LockCode = 0;
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::Stop: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

因此,在回答您是否必须使用队列的问题时。在您对 Xaqq 的评论中,您说“我需要在两个线程之间交换消息”。因此,使用像队列这样的容器可以将消息传递给另一个线程进行处理。如果你不喜欢 STL 容器,Boost 确实有一些. 据我所知,没有可以访问的 Boost ASIO 内部容器。存储和传递消息是您必须在代码中执行的操作。

关于 io_service::run 调用的最后一点说明。它只会在有工作要做时才会阻塞。看到这个链接. 在我上面的示例代码中,在调用 run 方法之前向 io_service 对象添加了一个工作项,因此它将无限期地阻塞——这就是我想要的。如果我真的只想要一个线程,那么我可能会设置工作线程以使用工作对象调用 run 方法,这样它就会无限期地阻塞。这将处理来自和去往服务器的所有异步 I/O。在类内部,我会编写一个或两个接口方法,以便 gui 可以将数据发送到服务器。这些方法可以使用异步写入 .vs。同步写入方法,因此会立即返回 - 所以你的 gui 不会阻塞太久。您需要编写一个 HandleWrite 方法。我的代码并没有做太多的事情 - 如果发生错误,只需记录一个错误。

于 2013-07-31T20:34:52.483 回答