ZeroMQ ( 版本 - zeromq-4.1.6 ) PGM 多播数据包接收卡在两者之间,即使发送方仍在发送数据包而没有任何问题。
如果我们重新启动 Receiver,应用程序现在会收到数据包,但这不是解决方案。我在发送方和接收方都尝试了各种ZMQ_RATE
。
问题:
发送方使用以下套接字选项发送了近 300,000 个数据包,但接收方卡在中间并且没有接收到所有数据包。如果我们Sleep( 2 )
在每次发送中添加 - 等待 2 毫秒,有时我们会收到所有数据包,但这需要更多时间。
环境设置:
(发送器和接收器使用 D-Link 交换机连接在单个子网内。媒体速度为 1Gbps)
Sender: JZMQ ( ZMQ C library, openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
Packet size - 1024 bytes
ZMQ_RECOVERY_IVL - 2 Minutes
Send Flag - 0 ( blocking mode )
Sleep( 2ms ) - sometimes its working without any issue but taking more time for transfer.
Platform - Windows
Receiver: ZMQ C++ ( ZMQ C library, openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
ZMQ_RCVTIMEO - 3 Secs
receive Flag - 0 ( blocking mode )
Platform - Windows
可能是什么问题?
ZeroMQ PGM-multicast 不是一个稳定的库吗?
JZMQ Sender:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.setRate(80000);
socket.setRecoveryInterval(60*60);
socket.setSendTimeOut(-1);
socket.setSendBufferSize(1024*64);
socket.bind("pgm://local_IP;239.255.0.20:30001");
byte[] bytesToSend = new byte[1024];
int count = 0;
while(count < 300000) {
socket.send(bytesToSend, 0);
count++;
}
------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"
int main(int argc, char* argv[]) {
try {
zmq::context_t context(1);
// Socket to talk to server
printf ("Connecting to server...");
zmq::socket_t *s1 = new zmq::socket_t(context, ZMQ_SUB);
int recvTimeout = 3000;
s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int));
int recvRate = 80000;
s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int));
int recsec = 60 * 60;
// s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recsec));
s1->connect("pgm://local_IP;239.255.0.20:30001");
s1->setsockopt (ZMQ_SUBSCRIBE, NULL, 0);
printf ("done. \n");
int seq=0;
while(true) {
zmq::message_t msgbuff;
int ret = s1->recv(&msgbuff,0);
if(!ret)
{
printf ("Received not received timeout\n");
continue;
}
printf ("Seq(%d) Received data size=%d\n",seq,msgbuff.size());
++seq;
}
}
catch( zmq::error_t &e ) {
printf ("An error occurred: %s\n", e.what());
return 1;
}
catch( std::exception &e ) {
printf ("An error occurred: %s\n", e.what());
return 1;
}
return 0;
}