4

我目前正在处理一个相当大的数据处理任务,需要拆分和分配处理。我有一个使用 ZeroMQ 的 PUSH / PULL 机制的完整工作管道原型——“处理”数据正在发送并完成。

但是,一旦数据被扇出并且处理完成,我需要指示工作人员(在 while/true 循环中运行)终止。我认为为此使用 ZMQ 的 PUB/SUB 机制是明智的。但是,工作人员没有收到任何控制消息。

令人沮丧的是,如果我切换到 PUSH/PULL 机制,工作人员会收到消息并正确终止。

我在这里从原型中复制了一些代码:

http://pastebin.com/myZyWQ1E

您可以看到生产者(向工作人员发送数据的位)创建并绑定到控制套接字(第 19 行),工作人员也绑定了该套接字(第 53 行)。一旦生产者完成工作,它通过控制套接字发送终止命令,工作人员继续循环并检查控制消息。不幸的是,第 75 行永远不会测试为真,并且该过程继续循环。

// Control socket create / bind from producer
$this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
$this->controlSocket->bind('tcp://172.0.0.1:51001');

// Control socket create / bind from worker
$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
$control->connect('tcp://172.0.0.1:51001');
4

1 回答 1

6

是的,您可能遇到了“慢加入者”问题的一个实例。PUB 不会阻止,因此如果没有人可以发送消息,它会丢弃消息。因为您创建了套接字并立即发送,所以订阅者仍将连接 - 这需要非零时间,因此消息会丢失。尝试一开始就创建控制套接字,应该让连接有机会稳定下来(或者,在控制套接字中的 bind() 和 send() 之间放置一点睡眠)。有关更详细的讨论,请参阅 ZGuide:http://zguide.zeromq.org/page:all 搜索“slow joiner”)。

我注意到的另一件事是您正在检查投票之外的控制套接字。poll 的目的是让你做那种事情——将它作为另一个 POLL_IN 添加到你已经创建的 pollset 中,并检查 read() 中返回的套接字是否是 === (注意,三等号!)控制套接字或数据套接字。这样,您将获得更好的响应时间。

于 2012-12-06T13:31:38.183 回答