4

我使用 ZMQ 的总体目标是避免陷入异步消息传递的麻烦;ZMQ 似乎是一个可移植且实用的解决方案。然而,大多数 ZeroMQ 文档,例如 this,以及我在 Google 上搜索过的许多其他 zmq 示例都是基于helloworld.c格式的。也就是里面都是简单的程序代码int main(){}

我的问题是我想在类 c++ 单例类中“嵌入”一个 zmq“监听器”。我想“收听”消息然后处理它们。我打算在重要的情况下使用 zmq 的PUSH->PULL套接字。我不知道该怎么做是在内部“事件循环”中。

class foomgr {
    public:
        static foomgr& get_foomgr();
    // ...
    private:
        foomgr();
        foomgr(const &foomgr);
    // ...
        listener_() {
            // EVENT LOOP HERE
            // RECV and PROCESS ZMQ MSGS
            // while(true) DOES NOT WORK HERE
        }
    // ...
        zmq::context_t zmqcntx_;
        zmq::socket_t zmqsock_;
        const int zmqsock_linger_ = 1000;
    // ....
}

我显然不能while(true)在侦听器中使用该构造,因为无论我从哪里调用它都会阻塞。由于使用 ZMQ 的优点之一是我不必自己管理“侦听器”线程,因此必须弄清楚如何创建自己的线程来包装 listener_ 似乎很愚蠢。我迷失了解决方案。

注意:我是一个 c++ 新手,所以对大多数人来说可能很明显的事情对我来说不是。此外,我正在尝试使用通用“单词”,而不是库或特定语言以避免混淆。代码是用 -std=c++11 构建的,所以这些结构很好。

4

2 回答 2

4

ZMQ C++ 库没有实现消息轮询的侦听器模式。它将这项任务留给你来包装你自己的类。但是,它确实支持轮询新消息的非阻塞模式。

因此,使用正确的代码,您可以以非阻塞方式将其包装在一个小循环中。

在 GitHub 上查看这个用 C++ 编写的轮询示例。请注意,它从 2 个套接字轮询,因此您需要稍微修改它以删除额外的代码。

您需要包含在自己的观察者实现中的重要部分如下:

zmq::message_t message;
zmq::poll (&items [0], 2, -1);

if (items [0].revents & ZMQ_POLLIN) {
    receiver.recv(&message);
    //  Process task
}
于 2013-11-15T05:34:38.343 回答
1

Zmq 在设计上不是线程安全的(到目前为止的版本)。事实上,Zmq 强调:

不要使用或关闭套接字,除非在创建它们的线程中。时期。

不应使用回调,因为调用回调的线程肯定与创建套接字的线程不同,这是被禁止的。

也许,您会发现有用的zmqHelper,一个小型库(只有两个类和几个函数),可以更轻松地在 C++ 中使用 Zmq 并强制(保证)线程不能共享套接字。

在示例部分中,您将了解如何执行最常见的任务。

希望能帮助到你。

代码片段:在 ROUTER-DEALER 代理中使用 zmqHelper 进行轮询。

zmq::context_t theContext {1}; // 1 thread in the socket 
SocketAdaptor< ZMQ_ROUTER > frontend_ROUTER {theContext};
SocketAdaptor< ZMQ_DEALER > backend_DEALER {theContext};

frontend_ROUTER.bind ("tcp://*:8000");
backend_DEALER.bind ("tcp://*:8001");

while (true) {

  std::vector<std::string> lines;

  // 
  //  wait (blocking poll) for data in any socket
  // 
  std::vector< zmqHelper::ZmqSocketType * > list
    = {  frontend_ROUTER.getZmqSocket(),  backend_DEALER.getZmqSocket() };

  zmqHelper::ZmqSocketType *  from = zmqHelper::waitForDataInSockets ( list );

  // 
  //  there is data, where is it from?
  // 
  if ( from ==  frontend_ROUTER.getZmqSocket() ) {
    // from frontend, read ...
    frontend_ROUTER.receiveText (lines);

    // ... and resend
    backend_DEALER.sendText( lines );
  }
  else if ( from ==  backend_DEALER.getZmqSocket() ) {
    // from backend, read ...
    backend_DEALER.receiveText (lines);

    // ... and resend
    frontend_ROUTER.sendText( lines );
  } 
  else if ( from == nullptr ) {
    std::cerr << "Error in poll ?\n";
  }

} // while (true)
于 2016-10-14T15:10:55.257 回答