我正在尝试让运行 ZMQ v 3.2.0 的本机 C 版本的设备与使用 pub/sub ZMQ 套接字使用 JeroMQ(纯 Java impl)构建的 Java 应用程序一起工作。但是,似乎 JeroMQ 在消息有效负载与 C 实现之前使用了不同的标志配置。IIRC,JeroMQ 旨在与 v 3.2.2 兼容,所以我不确定这是否是 JeroMQ 端口中的错误
我的 Java 测试代码类似于 psenvpub 示例: public class psenvpub {
public static void main (String[] args) throws Exception {
// Prepare our context and publisher
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:15052");
byte seq = 1;
while( !Thread.currentThread().isInterrupted() ){
byte[] message = new byte[8];
message[0] = 0;
message[1] = 0;
message[2] = 0;
message[3] = 0;
message[4] = seq++;
message[5] = 0;
message[6] = 0;
message[7] = 0;
publisher.send(message);
try{
Thread.sleep(1000);
}
catch(Exception e ){
break;
}
}
}
}
我正在为 Native C 端点使用 perl 脚本:
use strict;
use warnings;
use Vocollect::ZMQ::Context;
use ZMQ::Constants qw(ZMQ_SUB);
my $ctx = Vocollect::ZMQ::Context->new();
my $sock = $ctx->socket(ZMQ_SUB);
$sock->connect('tcp://localhost:15052');
$sock->subscribe('');
while (1) {
my $msg = $sock->recv(10000);
print "Received msg\n" if defined($msg);
}
当订阅者收到第一条消息时,由于 libzmq 源代码中的断言失败而崩溃:
Assertion failed: options.recv_identity (..\..\..\src\socket_base.cpp:990)
这是:
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
// Test whether IDENTITY flag is valid for this socket type.
if (unlikely (msg_->flags () & msg_t::identity))
zmq_assert (options.recv_identity);
// Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;
}
对发送的数据包的wireshark 跟踪显示,JeroMQ pub/sub 和本机 C pub/sub 之间的握手序列和标志不同。使用 JeroMQ 或原生 C libzmq 的端点时,我没有看到任何问题。