我目前正在处理一个相当大的数据处理任务,需要拆分和分配处理。我有一个使用 ZeroMQ 的 PUSH / PULL 机制的完整工作管道原型——“处理”数据正在发送并完成。
但是,一旦数据被扇出并且处理完成,我需要指示工作人员(在 while/true 循环中运行)终止。我认为为此使用 ZMQ 的 PUB/SUB 机制是明智的。但是,工作人员没有收到任何控制消息。
令人沮丧的是,如果我切换到 PUSH/PULL 机制,工作人员会收到消息并正确终止。
我在这里从原型中复制了一些代码:
您可以看到生产者(向工作人员发送数据的位)创建并绑定到控制套接字(第 19 行),工作人员也绑定了该套接字(第 53 行)。一旦生产者完成工作,它通过控制套接字发送终止命令,工作人员继续循环并检查控制消息。不幸的是,第 75 行永远不会测试为真,并且该过程继续循环。
// Control socket create / bind from producer
$this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
$this->controlSocket->bind('tcp://172.0.0.1:51001');
// Control socket create / bind from worker
$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
$control->connect('tcp://172.0.0.1:51001');