我正在尝试从示例 wuclient/wuserver 在 zeromq 上实现一个惰性订阅者。客户端比服务器慢得多,因此它必须只获取服务器最后发送的消息。
到目前为止,我发现这样做的唯一方法是连接/断开客户端,但每个连接当然会产生不必要的成本,大约 3 毫秒:
服务器.cxx
int main () {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
int counter = 0;
while (1) {
counter++;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%d", counter);
publisher.send(message);
std::cout << counter << std::endl;
usleep(100000);
}
return 0;
}
客户端.cxx
int main (int argc, char *argv[])
{
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
while(1){
zmq::message_t update;
int counter;
subscriber.connect("tcp://localhost:5556"); // This call take some milliseconds
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber.recv(&update);
subscriber.disconnect("tcp://localhost:5556");
std::istringstream iss(static_cast<char*>(update.data()));
iss >> counter;
std::cout << counter << std::endl;
usleep(1000000);
}
return 0;
}
服务器输出:1 2 3 4 5 6 7 8 9 ...
客户端输出:4 14 24 ...
我尝试在没有 co/deco 的情况下使用高水位标记来做到这一点,但它不起作用。即使使用这种代码,只有在缓冲区达到至少数百条消息时才会开始丢弃帧。:
int high_water_mark = 1;
socket.setsockopt(ZMQ_RCVHWM, &high_water_mark, sizeof(high_water_mark) );
socket.setsockopt(ZMQ_SNDHWM, &high_water_mark, sizeof(high_water_mark) );
zeromq-dev 中也有这篇文章密切相关,但提供的解决方案(使用另一个线程来选择最后一条消息是不可接受的,我无法通过网络传输大量消息,之后将不会使用。