54

我对 poller 在 zmq 中实际做了什么感到困惑。zguide 对其进行了最低限度的介绍,并且仅将其描述为一种从多个套接字读取的方式。这对我来说不是一个令人满意的答案,因为它没有解释如何拥有超时套接字。我知道zeromq:如何防止无限等待?解释推/拉,但不解释 req/rep 模式,这是我想知道如何使用的。

我想问的是:轮询器如何工作,它的功能如何应用于跟踪套接字及其请求?

4

2 回答 2

71

当您需要在同一线程中侦听不同的套接字时,请使用轮询器:

ZMQ.Socket subscriber = ctx.socket(ZMQ.SUB)
ZMQ.Socket puller = ctx.socket(ZMQ.PULL)

使用轮询器注册套接字(POLLIN侦听传入消息)

ZMQ.Poller poller = ZMQ.Poller(2)
poller.register(subscriber, ZMQ.Poller.POLLIN)
poller.register(puller, ZMQ.Poller.POLLIN)

轮询时,使用循环:

while( notInterrupted()){
  poller.poll()

  //subscriber registered at index '0'
  if( poller.pollin(0)) 
     subscriber.recv(ZMQ.DONTWAIT)

  //puller registered at index '1'
  if( poller.pollin(1))
     puller.recv( ZMQ.DONTWAIT)
}

选择您想要的投票方式...

poller.poll()阻塞,直到任一套接字上有数据。
poller.poll(1000)阻塞 1s,然后超时。

轮询器在套接字上有可用数据(消息)时发出通知;阅读它是你的工作。

阅读时,不要阻塞:socket.recv( ZMQ.DONTWAIT). 即使poller.pollin(0)检查是否有要读取的数据,您也希望避免轮询循环内的任何阻塞调用,否则,您最终可能会由于“卡住”套接字而阻塞轮询器。

因此,如果将两条单独的消息发送到subscriber,您必须调用subscriber.recv()两次才能清除轮询器,否则,如果您调用subscriber.recv()一次,轮询器将继续告诉您还有另一条消息要读取。因此,本质上,轮询器跟踪消息的可用性和数量,而不是实际消息。

您应该浏览投票示例并使用代码,这是最好的学习方式。

这是否回答你的问题?

于 2013-08-08T02:11:33.750 回答
2

在这个答案中,我列出了

来自文档http://api.zeromq.org/4-1:zmq-poll的详细信息

此外,我还添加了一些重要的解释,以及为新人清除困惑的事情!如果你赶时间!您可能想从轮询器做什么和接收如何,以及关于接收说明开始,最后只有一个套接字部分!从重要说明部分开始!我在哪里深入了解事情!我仍然建议仔细阅读文档参考中的详细信息!还有第一节!

文档参考和注释

监听多个套接字和事件

zmq_poll() 函数为应用程序提供了一种机制,可以在一组套接字上以电平触发的方式多路复用输入/输出事件。items 参数指向的数组的每个成员都是一个 zmq_pollitem_t 结构。nitems 参数指定 items 数组中的项目数。zmq_pollitem_t结构定义如下:

typedef struct
{
    void //*socket//;
    int //fd//;
    short //events//;
    short //revents//;
} zmq_pollitem_t;

zmq 套接字或标准套接字通过 fd

对于每个 zmq_pollitem_t 项目,zmq_poll() 将检查socket引用的 ØMQ 套接字文件描述符fd指定的标准套接字,以查找 events 中指定的事件。如果 socket 和 fd 都设置在单个 zmq_pollitem_t中,则socket 引用的 ØMQ 套接字应优先,并且 fd 的值应被忽略。

大笔记(相同的上下文):

传递给 zmq_poll() 函数的所有 ØMQ 套接字必须共享相同的 ØMQ 上下文,并且必须属于调用 zmq_poll() 的线程。

会员

对于每个 zmq_pollitem_t 项,zmq_poll() 应首先清除 revents 成员,然后通过设置 revents 成员中与事件条件对应的位来指示已发生的任何请求事件。

成功完成后,zmq_poll() 函数应返回 zmq_pollitem_t 结构的数量,其中包含在 revents 中发出的事件,如果没有发出任何事件,则返回 0。

等待事件和阻塞

如果在任何 zmq_pollitem_t项目上没有发生任何请求的事件 zmq_poll ()等待超时微秒,以便在任何被请求的项目上发生事件。如果timeout 的值为 0zmq_poll() 将立即返回。如果timeout 的值为 -1,zmq_poll() 将无限期地阻塞,直到至少一个 zmq_pollitem_t 上发生了请求的事件。超时的分辨率为 1 毫秒

  • 0 => 不等待

  • -1 => 块

  • +val => 阻塞并等待超时量

活动

zmq_pollitem_t 的 events 和 revents 成员是通过 OR'ing 以下事件标志的组合构造的位掩码:

ZMQ_POLLIN

对于 ØMQ 套接字,至少可以从套接字接收到一条消息而不会阻塞。对于标准套接字,这相当于 poll() 系统调用的 POLLIN 标志,通常意味着可以从 fd 读取至少一个字节的数据而不会阻塞。

ZMQ_POLLOUT

对于 ØMQ 套接字,至少有一条消息可以不阻塞地发送到该套接字。对于标准套接字,这等效于 poll() 系统调用的 POLLOUT 标志,通常意味着可以将至少一个字节的数据写入 fd 而不会阻塞。

ZMQ_POLLERR

对于标准套接字,此标志通过 zmq_poll() 传递给底层的 poll() 系统调用,通常意味着 fd 指定的套接字上存在某种错误情况。对于 ØMQ 套接字,如果在事件中设置此标志无效,并且不会由 zmq_poll() 在 revents 中返回。

笔记:

zmq_poll() 函数可以使用 poll() 以外的操作系统接口来实现或模拟,因此可能会以本文档中未定义的方式受到这些接口的限制。

返回值

成功完成后,zmq_poll() 函数应返回 zmq_pollitem_t 结构的数量,其中包含在 revents 中发出的事件,如果没有发出任何事件,则返回 0。失败时,zmq_poll() 将返回 -1 并将 errno 设置为下面定义的值之一。

例子

无限期地轮询 0mq 套接字和标准套接字上的输入事件。

zmq_pollitem_t items [2];
/* First item refers to ØMQ socket 'socket' */
items[0].socket = socket;
items[0].events = ZMQ_POLLIN;
/* Second item refers to standard socket 'fd' */
items[1].socket = NULL;
items[1].fd = fd;
items[1].events = ZMQ_POLLIN;
/* Poll for events indefinitely */
int rc = zmq_poll (items, 2, -1);
assert (rc >= 0); /* Returned events will be stored in items[].revents */

重要笔记

轮询器做什么以及接收什么

轮询器仅检查并等待事件何时发生! POLLIN是用来接收的!有数据可以接收!然后我们应该通读recv()!我们有责任阅读或做任何事情!轮询器只是在那里收听事件并等待它们!并且通过 zmq_pollitem_t我们可以监听多个事件!如果发生任何事件!然后轮询器解除阻塞!然后我们可以在recv中检查事件!和 zmq_pollitem_t!请注意,轮询器在事件触发时将它们排队!下一个电话将从队列中挑选!因此,订单也被保留了!并且连续调用将返回下一个事件,依此类推!当他们进来的时候!

关于接收和仅一个插座的说明

对于路由器!一个路由器甚至可以从一个客户端接收多个请求!并且同时来自多个客户!在多个客户端具有相同性质的设置中!并且是连接到路由器的那些!一个新人可能会想到的问题是!对于这种异步性质,我是否需要轮询器!答案是否定的!无需轮询器并监听不同的套接字!

重要的是:接收调用(zmq_recv()、socket.recv() 一些 lang 绑定)!堵塞!并且是阅读的方式!消息来的时候!他们在排队!轮询器与此无关!轮询器只监听来自不同套接字的事件!如果其中任何一个发生,请取消阻止!如果达到超时,则不会发生任何事件!仅此而已!

接收的性质是直截了当的!接收呼叫阻塞!直到消息队列中的一条消息到来!当多人来时,他们将排队!然后在每次下一次调用 recv() 时!我们将拉下一条消息!或者框架!(取决于我们使用的接收方法!和api级别!以及从绑定库到低级别的抽象!)因为我们也可以按帧访问消息!每次通话一帧!但到了这里就清楚了!接电话就是要接的东西!他们阻止直到消息进入队列!多个并行消息!他们来了就会排队!然后每次通话!队列是否已满!消费它!或者等等!这是一件非常重要的事情!这会让新来的人感到困惑!

只有当有多个套接字时才需要轮询器!它们总是我们在相关进程代码上声明的套接字(绑定它们,或连接到某个东西)!因为如果没有!你将如何收到消息!你不能做好!因为你必须优先考虑一个或另一个!在一个有一个 recv() 的循环中先行!哪个会阻塞!即使另一个套接字在它的队列中收到一条消息!循环被阻塞,无法进行下一个recv()!因为轮询器给了我们能够解决这个问题的美丽!并与多个套接字一起工作!

while(true) {
    socket1.recv() // this will block

    socket2.recv() // this will have to wait till the first recieve! Even if messages come in in it's queue

使用轮询器:

While(true) {
    zmq_poll() // block till one of the socket events happen! If the event was POLLIN!
            // If any socket get a message to it's queue
            // This will unblock
    // then we check which type and which socket was
    if (condition socket 1) {
        // treat socket 1 request
    }

    if (condition socket 2) {
        // treat socket 2 request
    }
 
    // ...
}

您可以在本节的文档中查看真实代码(滚动到足以查看代码块,您也可以在所有不同的语言中看到)

知道轮询器只是通知有消息!如果是波林!

每次迭代中!如果许多事件已经触发,轮询器!Let's give the example of 10 messages recieved 5 in each socket!在这里,轮询器已经将事件排队!并且在每次下一次调用中为 9 次!马上解决!有问题的消息可以映射到哪个套接字(通过使用轮询器对象和意思!所以绑定库使它变得太简单和令人愉快)!然后正确的套接字块将进行接收调用!当它这样做时,它将消耗它的队列中的下一条消息

所以你不断循环每次都消耗一条消息当他们进来的时候!轮询器跟踪那里的进入顺序!通过订阅和选择收听的事件收到的应该是POLLIN !

然后每个套接字都有它的消息队列!并且每次接到电话从中拉出来!民意调查员追踪了他们!所以当轮询器解决时!可以确保套接字接收呼叫的消息

最后一个例子:服务器客户端模式

让我们以一台服务器(路由器)和许多连接到的客户端(经销商)为例!如下图所示!

在此处输入图像描述

问题:许多连接到同一个路由器!立即异步发送!布拉布拉布拉!在服务器端(路由器)!我需要轮询器!?很多新人,可能会认为是或质疑是否需要!是的,你猜对了!

大不!

为什么 ?因为在服务器(路由器)代码中!我们只有一个正在处理的套接字!我们绑定!客户端然后连接到它!到此为止!只有一个插座!并且所有 recv() 调用都在那个套接字上!那个套接字有它的消息队列!recv() 一个接一个地消费消息!异步无关紧要,它们是如何产生的!同样,轮询器仅在有多个套接字时才有效!因此具有处理来自多个套接字的消息的混合性质!如果不!然后一个套接字的一个 recv() 需要先去然后另一个!并且会挡住对方!不是一件好事(这是坏事)!

笔记

这个答案带来了一个很好的清理!此外,它还引用了具有良好突出显示的文档!还显示低级库(c lang)的代码!@rafflan的答案显示了一个带有绑定库的很棒的代码(似乎是 c#)!和一个很好的解释!如果你没有检查它,你必须!

于 2021-05-25T13:55:04.320 回答