1

我正在尝试让运行 ZMQ v 3.2.0 的本机 C 版本的设备与使用 pub/sub ZMQ 套接字使用 JeroMQ(纯 Java impl)构建的 Java 应用程序一起工作。但是,似乎 JeroMQ 在消息有效负载与 C 实现之前使用了不同的标志配置。IIRC,JeroMQ 旨在与 v 3.2.2 兼容,所以我不确定这是否是 JeroMQ 端口中的错误

我的 Java 测试代码类似于 psenvpub 示例: public class psenvpub {

public static void main (String[] args) throws Exception {
    // Prepare our context and publisher
    Context context = ZMQ.context(1);
    Socket publisher = context.socket(ZMQ.PUB);

    publisher.bind("tcp://*:15052");
    byte seq = 1;
    while( !Thread.currentThread().isInterrupted() ){
        byte[] message = new byte[8];
        message[0] = 0;
        message[1] = 0;
        message[2] = 0;
        message[3] = 0;
        message[4] = seq++;
        message[5] = 0;
        message[6] = 0;
        message[7] = 0;
        publisher.send(message);
        try{
            Thread.sleep(1000);
        }
        catch(Exception e ){
            break;
        }
    }
}
}

我正在为 Native C 端点使用 perl 脚本:

use strict;
use warnings;

use Vocollect::ZMQ::Context;
use ZMQ::Constants qw(ZMQ_SUB);

my $ctx = Vocollect::ZMQ::Context->new();
my $sock = $ctx->socket(ZMQ_SUB);
$sock->connect('tcp://localhost:15052');
$sock->subscribe('');

while (1) {
    my $msg = $sock->recv(10000);
    print "Received msg\n" if defined($msg);
}

当订阅者收到第一条消息时,由于 libzmq 源代码中的断言失败而崩溃:

Assertion failed: options.recv_identity (..\..\..\src\socket_base.cpp:990)

这是:

void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
    //  Test whether IDENTITY flag is valid for this socket type.
    if (unlikely (msg_->flags () & msg_t::identity))
        zmq_assert (options.recv_identity);

    //  Remove MORE flag.
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}

对发送的数据包的wireshark 跟踪显示,JeroMQ pub/sub 和本机 C pub/sub 之间的握手序列和标志不同。使用 JeroMQ 或原生 C libzmq 的端点时,我没有看到任何问题。

4

0 回答 0