3

我稍微修改了 ZeroMQ 指南中的 helloworld 服务器程序 ( hwserver.pl ),以使用 AnyEvent 实现它。然而,在 REQ/REP 的两次迭代之后,程序挂起。有人能弄清楚为什么吗?

这是服务器:

#!/usr/bin/perl -w

use strict;
use warnings;
use 5.12.0;

use EV;
use AnyEvent;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw/ ZMQ_REP ZMQ_FD /;

my $context = zmq_init();

my $responder = zmq_socket($context, ZMQ_REP);
my $fh = zmq_getsockopt( $responder, ZMQ_FD );
zmq_bind($responder, 'tcp://*:5555');

our $w; $w = AE::io $fh, 0, sub {
    say "Receiving...";
    zmq_recv($responder, my $buf, 1_000_000);
    say "Received request: [$buf]";
    sleep(1);
    zmq_msg_send('World', $responder);
    sleep(1);
};

EV::run;

这是客户:

#!/usr/bin/perl -w

use strict;
use warnings;
use 5.12.0;

use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_REQ);

my $context = zmq_init();

# Socket to talk to server
say 'Connecting to hello world server...';
my $requester = zmq_socket($context, ZMQ_REQ);
zmq_connect($requester, 'tcp://localhost:5555');

for my $request_nbr (0..9) {
    say "Sending request $request_nbr...";
    zmq_msg_send('Hello', $requester);
    my $msg = zmq_msg_init();
    say "Receiving...";
    zmq_msg_recv($msg, $requester);
    say "Received reply $request_nbr: [". zmq_msg_data($msg) ."]";
}

这是服务器的输出:

Receiving...
Received request: [Hello]
Receiving...
Received request: [Hello]

这是客户端的输出:

Connecting to hello world server...
Sending request 0...
Receiving...
Received reply 0: [World]
Sending request 1...
Receiving...
Received reply 1: [World]
Sending request 2...
Receiving...

怎么了?

4

2 回答 2

3

免责声明:我之前并没有真正使用过 ZeroMQ

首先注意两点:

  1. 即使 client.pl 尚未启动,服务器也会打印“Receiving...”。这意味着即使没有任何内容发送到此套接字,也会调用 io watcher 回调
  2. 在服务器中,如果您用简单的 while(1) 替换设置 IO 观察程序 (AE::io) 的行,则服务器/客户端可以正常工作。这意味着当我们应该从客户端接收消息时,IO 回调没有被正确触发(这使得它“挂起”)。

我很确定您遇到了http://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/中描述的问题。简而言之:

  1. 即使 zmq 文件句柄被通知准备好读取它并不意味着有消息等待接收。

    回调应检查这是否不是“误报”。

    unless (ZMQ_POLLIN & zmq_getsockopt( $responder, ZMQ_EVENTS ) ) {
        say "Nothing to recv from socket, skipping";
        return;
    }
    

    在回调开始时似乎可以解决问题。但我认为这在这里并不重要。这个问题只会让你在数据存在之前调用 zmq_recv,所以它会让你的程序更加阻塞

  2. 即使收到多条消息,IO watcher 也可能只被调用一次。这在上面链接的“ZeroMQ 使用边缘触发”段落中的文章中有很好的描述。这就是为什么你应该在回调内部循环,直到没有更多消息。您可以通过以下方式以非阻塞方式执行此操作:

    while (my $msg = zmq_recvmsg($responder,ZMQ_DONTWAIT)) {
        say "Received request: [".zmq_msg_data($msg)."]";
        zmq_msg_send('World', $responder);
    }
    

    此循环将处理所有待处理的消息,如果没有更多消息则离开。

    UPDATE链接文章建议在循环中接收消息,而 getsockopt for ZMQ_EVENTS 报告 ZMQ_POLLIN 已设置。这对我来说似乎更优雅。我没有测试它,不知道是否有实际差异。

    我不知道您的场景中,服务器如何同时接收到多条消息,但是仅上面的代码更改似乎就可以解决问题。可能我对 ZeroMQ 了解的不够多。如果有人可以向我解释这种特定情况下的“原因”,我会很高兴。

于 2013-08-21T11:56:07.403 回答
3

您是否有理由不使用文档为您的服务器建议的 while() 语法?

http://search.cpan.org/~dmaki/ZMQ-LibZMQ3-1.00_04/lib/ZMQ/LibZMQ3.pm

从 zeromq2-2.1.0 开始,您可以使用 getsockopt 检索底层文件描述符,因此使用它来集成 ZMQ::LibZMQ3 和 AnyEvent:

my $socket = zmq_socket( $ctxt, ZMQ_REP );
my $fh = zmq_getsockopt( $socket, ZMQ_FD );
my $w; $w = AE::io $fh, 0, sub {
    while ( my $msg = zmq_recv( $socket, ZMQ_RCVMORE ) ) {
        # do something with $msg;
    }
    undef $w;
};

此外,睡眠将被阻塞。您确实应该注册一个与计时器事件相关的回调,该事件设置为在一秒钟后运行,以发送响应。

于 2013-08-20T13:59:03.463 回答