25

我有一个使用 ZeroMQ 进行消息传递的 C++ 应用程序。但它还必须为基于 AJAX / Comet 的 Web 服务提供 SGCI 连接。

为此,我需要一个普通的 TCP 套接字。我可以通过普通的 Posix 套接字来做到这一点,但是为了保持跨平台的便携性并使我的生活更轻松(我希望......)我正在考虑使用 Boost::ASIO。

但是现在我遇到了 ZMQ 想要使用它自己zmq_poll()和 ASIO的冲突io_service.run()......

有没有办法让 ASIO 与 0MQ 一起工作zmq_poll()

还是有其他推荐的方法来实现这样的设置?

注意:我可以通过使用多个线程来解决这个问题——但它只是一个小的单核/CPU 盒子,它会以非常低的 SCGI 流量运行该程序,所以多线程会浪费资源......

4

5 回答 5

16

阅读此处此处的文档后,特别是本段

ZMQ_FD:检索与套接字关联的文件描述符 ZMQ_FD 选项应检索与指定套接字关联的文件描述符。返回的文件描述符可用于将套接字集成到现有的事件循环中;ØMQ 库应通过使文件描述符准备好读取,以边缘触发的方式通知套接字上的任何未决事件。

我认为您可以使用null_buffersfor zmq_pollitem_teach 并将事件循环推迟到 a io_service,完全绕过zmq_poll()。然而,上述文档中似乎有一些警告,特别是

从返回的文件描述符中读取的能力并不一定表明消息可以从底层套接字读取或写入;应用程序必须通过随后检索 ZMQ_EVENTS 选项来检索实际事件状态。

因此,当您的一个 zmq 套接字的处理程序被触发时,您必须在处理我认为的事件之前做更多的工作。未编译的伪代码如下

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
    boost::asio::null_buffers(),
    [=](const boost::system::error_code& error)
    {
       if (!error) {
           // handle data ready to be read
       }
     }
);

请注意,您不必在这里使用 lambda,boost::bind成员函数就足够了。

于 2012-10-11T22:13:27.300 回答
2

最后我发现有两种可能的解决方案:

  • Sam Miller 的我们使用 ASIO 的事件循环的地方
  • .native()ZeroMQ 的事件循环,通过方法获取 ASIO 文件描述符acceptor并将socket它们插入到数组中zmq_pollitem_t

我已经接受了 Sam Miller 的回答,因为这对我来说是 SCGI 案例中的最佳解决方案,其中不断创建和结束新的连接。因此,处理每一个变化的zmq_pollitem_t数组是一个很大的麻烦,可以通过使用 ASIO 事件循环来避免。

于 2012-10-14T16:09:49.067 回答
2

获得 ZeroMQ 的套接字是战斗中最小的部分。ZeroMQ 基于 TCP 上的分层协议,因此如果您走这条路线,您将不得不在自定义 Boost.Asio io_service 中重新实现 ZeroMQ。我在使用 Boost.Asio 创建异步ENet服务时遇到了同样的问题,首先尝试使用 Boost.Asio UDP 服务从 ENet 客户端捕获流量。ENet 是一个在 UDP 上分层的类似 TCP 的协议,所以我当时所做的只是捕获处于几乎无用状态的数据包。

Boost.Asio 是基于模板的,内置的 io_service 使用模板基本封装了系统套接字库来创建 TCP 和 UDP 服务。我的最终解决方案是创建一个自定义 io_service 来包装 ENet 库而不是系统套接字库,从而允许它使用 ENet 的传输功能,而不必使用内置的 UDP 传输重新实现它们。

ZeroMQ 也可以这样做,但是 ZeroMQ 本身已经是一个非常高性能的网络库,它已经提供了异步 I/O。我认为您可以通过使用 ZeroMQ 现有的 API 接收消息并将消息传递到 io_service 线程池来创建一个可行的解决方案。这样消息/任务仍将使用 Boost.Asio 的反应器模式异步处理,而无需重新编写任何内容。ZeroMQ 将提供异步 I/O,Boost.Asio 将提供异步任务处理程序/工作者。

现有的 io_service 仍然可以耦合到现有的 TCP 套接字,允许线程池处理 TCP(在您的情况下为 HTTP)和 ZeroMQ。在这样的设置中,ZeroMQ 任务处理程序完全有可能访问 TCP 服务会话对象,从而允许您将 ZeroMQ 消息/任务的结果发送回 TCP 客户端。

以下只是为了说明这个概念。

// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
    boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
}

while (1) {
    char buffer [10];
    zmq_recv (responder_, buffer, 10, 0);
    io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
于 2014-02-10T05:44:01.610 回答
2

在这个问题发生 2 年后,有人发布了一个正是这样做的项目。该项目在这里:https ://github.com/zeromq/azmq 。讨论设计的博客文章在这里:https ://rodgert.github.io/2014/12/24/boost-asio-and-zeromq-pt1/ 。

以下是从自述文件中复制的示例代码:

#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <array>

namespace asio = boost::asio;

int main(int argc, char** argv) {
    asio::io_service ios;
    azmq::sub_socket subscriber(ios);
    subscriber.connect("tcp://192.168.55.112:5556");
    subscriber.connect("tcp://192.168.55.201:7721");
    subscriber.set_option(azmq::socket::subscribe("NASDAQ"));

    azmq::pub_socket publisher(ios);
    publisher.bind("ipc://nasdaq-feed");

    std::array<char, 256> buf;
    for (;;) {
        auto size = subscriber.receive(asio::buffer(buf));
        publisher.send(asio::buffer(buf));
    }
    return 0;
}

看起来不错。如果您尝试,请在评论中告诉我它是否在 2019 年仍然有效[我可能会在几个月内尝试,然后更新此答案](回购已过时,最后一次提交是一年前)

于 2019-04-04T14:46:33.353 回答
0

解决方案是轮询您的 io_service 而不是 run()。

查看此解决方案以获取一些poll()信息。

使用 poll 而不是 run 将允许您轮询 zmq 的连接,而不会出现任何阻塞问题。

于 2012-10-11T12:58:35.330 回答