我正在研究 ZeroMQ 的 PGM 支持。使用 NetMQ 库在 Windows 上运行(在以 MacOS 作为主机的 VirtualBox 中)。
我要做的测试很简单:尽可能快地从 A 向 B 发送消息......
首先,我使用 TCP 作为传输;这很容易达到每秒超过 150 000 条消息,两个接收器保持同步。然后我想测试PGM;我所做的只是在两边用“pgm://239.0.0.1:5557”替换地址“tcp://*:5556”。
现在,PGM 测试给出了非常奇怪的结果:发件人轻松达到 >200 000 条消息/秒;但是,接收者只能处理大约 500 条消息/秒!?
所以,我不明白发生了什么。在减慢发送者的速度后(每条消息后睡眠 10 毫秒,否则实际上不可能调查流),在我看来,接收者正在努力跟上,最初看到每条消息经过,然后窒息,错过了一系列消息,然后尝试再次跟上...我使用了 HWM 和恢复间隔设置,但这似乎没有太大区别(?!)。
谁能解释发生了什么?
非常感谢,弗雷德里克
注意:不确定是否重要:据我了解,我不使用 OpenPGM - 我只是下载 ZeroMQ 设置,并在 Windows 中启用“多播支持”。
这是发件人代码:
class MassSender
{
private const string TOPIC_PREFIX = "Hello:";
private static int messageCounter = 0;
private static int timerCounter = 0;
public static void Main(string[] args)
{
Timer timer = new Timer(1000);
timer.Elapsed += timer_Elapsed;
SendMessages_0MQ_NetMQ(timer);
}
private static void SendMessages_0MQ_NetMQ(Timer timer)
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket publisher = context.CreateSocket(ZmqSocketType.Pub))
{
//publisher.Bind("tcp://*:5556");
publisher.Bind("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.
timer.Start();
while (true)
{
string message = GetMessage();
byte[] body = Encoding.UTF8.GetBytes(message);
publisher.Send(body);
}
}
}
}
private static string GetMessage()
{
return TOPIC_PREFIX + "Message " + (++messageCounter).ToString();
}
static void timer_Elapsed(object sender, ElapsedEventArgs e)
{
Console.WriteLine("=== SENT {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s ===", messageCounter, messageCounter / ++timerCounter);
}
}
和接收方:
class MassReceiver
{
private const string TOPIC_PREFIX = "Hello:";
private static int messageCounter = 0;
private static int timerCounter = 0;
private static string lastMessage = String.Empty;
static void Main(string[] args)
{
// Assume that sender and receiver are started simultaneously.
Timer timer = new Timer(1000);
timer.Elapsed += timer_Elapsed;
ReceiveMessages_0MQ_NetMQ(timer);
}
private static void ReceiveMessages_0MQ_NetMQ(Timer timer)
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket subscriber = context.CreateSocket(ZmqSocketType.Sub))
{
subscriber.Subscribe(""); // Subscribe to everything
//subscriber.Connect("tcp://localhost:5556");
subscriber.Connect("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.
timer.Start();
while (true)
{
messageCounter++;
byte[] body = subscriber.Receive();
string message = Encoding.UTF8.GetString(body);
lastMessage = message; // Only show message when timer elapses, otherwise throughput drops dramatically.
}
}
}
}
static void timer_Elapsed(object sender, ElapsedEventArgs e)
{
Console.WriteLine("=== RECEIVED {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s === (Last: {2})", messageCounter, messageCounter / ++timerCounter, lastMessage);
}
}