5

考虑下面的示例代码(作为示例,我快速输入它,如果有错误也没关系 - 我对理论感兴趣)。

bool shutDown = false; //global

int main()
{
  CreateThread(NULL, 0, &MessengerLoop, NULL, 0, NULL);
  //do other programmy stuff...
}


DWORD WINAPI MessengerLoop( LPVOID lpParam )
{
  zmq::context_t context(1);
  zmq::socket_t socket (context, ZMQ_SUB);
  socket.connect("tcp://localhost:5556");
  socket.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    zmq_msg_recv (&getMessage, socket, 0); //This line will wait forever for a message
    processMessage(getMessage); 
  }
}

创建一个线程来等待传入的消息并适当地处理它们。线程正在循环,直到shutDown设置为 true。

在 ZeroMQ 中,指南明确说明了必须清理的内容,即消息、套接字和上下文。

我的问题是:由于recv将永远等待消息,阻塞线程,如果从未收到消息,我该如何安全地关闭该线程?

4

4 回答 4

11

阻塞调用将以几种方式退出。首先,这取决于您的语言和绑定,中断(Ctrl-C、SIGINT、SIGTERM)将退出调用。您将返回(再次,取决于您的绑定)错误或空消息(libzmq 返回 EINTR 错误)。

其次,如果您在另一个线程中终止上下文,阻塞调用也将退出(libzmq 返回 ETERM 错误)。

第三,您可以在套接字上设置超时,以便在没有数据的情况下在任何情况下都会在超时后返回。我们不经常这样做,但在某些情况下它可能很有用。

最后,我们在实践中所做的绝不是阻塞接收,而是使用 zmq_poll 找出套接字何时有消息等待,然后从这些套接字接收。这就是您向外扩展以处理更多套接字的方式。

于 2013-04-25T12:38:03.393 回答
2

您可以使用非阻塞调用标志ZMQ_DONTWAIT

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    while(-1 == zmq_msg_recv(&getMessage, socket, ZMQ_DONTWAIT))
    {
      if (EAGAIN != errno || shutDown)
      {
        break;
      }
      Sleep(100);
    }
    processMessage(getMessage); 
  }
于 2013-04-25T12:31:26.043 回答
0

假设您有一个类 SUB(订阅者)来管理您的 ZMQ 消息的接收。在主函数/类的析构函数或退出函数中,调用以下命令:

pub->close();

///
/// Close the publish context
///
void PUB::close()
{
    zmq_close (socket);
    zmq_ctx_destroy (context);
}

这将使“recv”阻塞以您可以忽略的错误消息终止。应用程序将以正确的方式轻松退出。这是正确的方法。祝你好运!

于 2018-01-12T15:50:05.113 回答
0

每当 zmq 上下文被销毁时,zmq_msg_recv 都会收到 -1。我在所有代码中都使用它作为终止条件。

while (!shutdown)
{
    ..
    ..
    int rc = zmq_msg_recv (&getMessage, socket, 0);
    if (rc != -1)
    {
      processMessage;
    }
    else 
      break;
}

请记住在 main() 的末尾销毁 zmq 上下文以进行适当的清理。

zmq_ctx_destroy(zctx); 
于 2017-03-08T01:12:04.027 回答