1

ZeroMQ 相当新。我有一个简单的 REQ/REP 队列,如下所示。我正在使用 PHP,但这并不重要,因为任何语言绑定对我来说都可以。这是客户端请求任务

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
$req->send("Export Data as Zip");
echo $i . ":" . $req->recv().PHP_EOL;

这是一个真正执行任务的工人。

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
echo "Server is started at port $port" . PHP_EOL;
while(true)
{
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    // Do the work here, takes 10 min, knows the count of lines added and remaining
    $srvr->send($msg . " is exported as zip file" . date('H:i:s'));
}

由于导出数据的任务大约需要 10 分钟,我想从不同的客户端连接到服务器并获得任务完成的进度/百分比。我想知道这是否是一种有效的方法。

我在 REQ/REP 部分工作的地方尝试了这种方法,但在 PUB/SUB 部分我什么也没得到

服务器部分

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");

// add PUB socket to publish progress
$c = new ZMQContext();
$p = new ZMQSocket($c, ZMQ::SOCKET_PUB);
$p->bind("tcp://*:5460");

echo "Server is started at port 5454" . PHP_EOL;
$prog = 0;
while(true)
{
    $p->send($prog++ . '%'); // this part doesn't get to the progress client
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    sleep(2);// some long task
    $srvr->send($msg . " Done zipping " . date('H:i:s'));
}

进度客户端

$ctx = new ZMQContext();
$stat = new ZMQSocket($ctx, ZMQ::SOCKET_SUB);
$stat->connect('tcp://localhost:5460');
while (true){
    echo $stat->recv() . PHP_EOL; //nothing shows here
}

请求客户

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
for($i=0;$i<100;$i++){
    $req->send("$i : Zip the file please");
    echo $i . ":" . $req->recv().PHP_EOL; //works and get the output
}
4

2 回答 2

1

这个概念是可行的,需要一些调整:

所有 PUB 交易对手都必须通过至少一个空订阅来设置任何非默认订阅,.setsockopt( ZMQ_SUBSCRIBE, "" )这意味着接收所有 TOPIC(没有“过滤”-ed out)。

接下来,PUB 端和 SUB 端都应该进行.setsockopt( ZMQ_CONFLATE, 1 )配置,因为一旦唯一的值在“最后一个”、最近信息。

始终,应该首选 ZeroMQ 调用的非阻塞模式(.recv( ..., flags = ZMQ_NOBLOCK )等),或者 Poller.poll()应该先使用预测试来嗅探消息的(不)存在,然后再花更多精力从“从" ZeroMQ 上下文管理器。简而言之,阻塞模式的服务调用可以在生产级系统中很好地服务的情况并不多。

此外,一些进一步的调整可能会帮助 PUB 方,以防更大规模的“攻击”来自不受限制的 SUB 方实体池,并且 PUB 必须为这些(不受限制的)交易对手中的每一个创建/管理/维护资源。

于 2017-07-07T10:33:09.397 回答
0

如果有多个客户端想要接收相同的进度更新,您只需要使用 PUB/SUB。只需使用 PUSH/PULL 即可通过 tcp 进行简单的点对点传输。

哲学讨论

解决此类问题有两种方法。

  1. 使用额外的套接字来传达额外的消息类型,
  2. 仅使用两个套接字,但通过它们传递不止一种消息类型

你说的是做1)。可能值得考虑 2),但我必须强调,我对 PHP 几乎一无所知,因此不知道是否有语言特性鼓励人们拥有单独的请求和进度客户端。

如果你这样做了,你的原始客户端需要一个循环(在它发送请求之后)来接收多条消息,要么是进度更新消息,要么是最终结果。您的服务器在进行 10 分钟查找时,将定期发送进度更新消息,并在最后发送最终结果消息。您可能会使用 PUSH/PULL 客户端到服务器,并且从服务器返回到客户端的进度/结果再次相同。

遵循 2) 在架构上更加灵活。一旦您有办法通过单个套接字发送两种或多种消息类型并在接收端解码它们,您就可以发送更多。例如,您可以决定将“取消”消息从客户端添加到服务器,或者将部分结果消息从服务器添加回客户端。这比仅仅因为您想在客户端和服务器之间添加另一个消息流而继续向架构中添加更多套接字要容易得多。再说一次,我对 PHP 的了解还不够,不能说这绝对是用那种语言做它的正确方法。这在 C、C++ 中当然很有意义。

我发现 Google Protocol Buffers(我更喜欢 ASN.1)之类的东西对这类事情非常有用。这些允许您定义要发送的消息类型,并且(至少使用 GPB)将它们组合在一个“oneof”中(在 ASN.1 中,使用标记来区分不同的消息)。GPB 和 ASN.1 很方便,因为这样您就可以在系统中使用不同的语言、操作系统和平台,而不必真正担心发送的内容。并且是二进制(不是文本),它们在网络连接中的效率更高。

于 2017-07-07T20:32:37.073 回答