0

我正在研究 ZeroMQ 的 PGM 支持。使用 NetMQ 库在 Windows 上运行(在以 MacOS 作为主机的 VirtualBox 中)。

我要做的测试很简单:尽可能快地从 A 向 B 发送消息......

首先,我使用 TCP 作为传输;这很容易达到每秒超​​过 150 000 条消息,两个接收器保持同步。然后我想测试PGM;我所做的只是在两边用“pgm://239.0.0.1:5557”替换地址“tcp://*:5556”。

现在,PGM 测试给出了非常奇怪的结果:发件人轻松达到 >200 000 条消息/秒;但是,接收者只能处理大约 500 条消息/秒!?

所以,我不明白发生了什么。在减慢发送者的速度后(每条消息后睡眠 10 毫秒,否则实际上不可能调查流),在我看来,接收者正在努力跟上,最初看到每条消息经过,然后窒息,错过了一系列消息,然后尝试再次跟上...我使用了 HWM 和恢复间隔设置,但这似乎没有太大区别(?!)。

谁能解释发生了什么?

非常感谢,弗雷德里克

注意:不确定是否重要:据我了解,我不使用 OpenPGM - 我只是下载 ZeroMQ 设置,并在 Windows 中启用“多播支持”。

这是发件人代码:

class MassSender
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;

    public static void Main(string[] args)
    {
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        SendMessages_0MQ_NetMQ(timer);
    }

    private static void SendMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket publisher = context.CreateSocket(ZmqSocketType.Pub))
            {
                //publisher.Bind("tcp://*:5556");
                publisher.Bind("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    string message = GetMessage();

                    byte[] body = Encoding.UTF8.GetBytes(message);
                    publisher.Send(body);
                }
            }
        }
    }

    private static string GetMessage()
    {
        return TOPIC_PREFIX + "Message " + (++messageCounter).ToString();
    }
    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== SENT {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s ===", messageCounter, messageCounter / ++timerCounter);
    }
}

和接收方:

class MassReceiver
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;
    private static string lastMessage = String.Empty;

    static void Main(string[] args)
    {
        // Assume that sender and receiver are started simultaneously.
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        ReceiveMessages_0MQ_NetMQ(timer);
    }

    private static void ReceiveMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket subscriber = context.CreateSocket(ZmqSocketType.Sub))
            {
                subscriber.Subscribe(""); // Subscribe to everything

                //subscriber.Connect("tcp://localhost:5556");
                subscriber.Connect("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    messageCounter++;

                    byte[] body = subscriber.Receive();

                    string message = Encoding.UTF8.GetString(body);                        
                    lastMessage = message; // Only show message when timer elapses, otherwise throughput drops dramatically.  
                }
            }
        }
    }

    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== RECEIVED {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s === (Last: {2})", messageCounter, messageCounter / ++timerCounter, lastMessage);
    }
}
4

2 回答 2

0

每条消息的大小是多少?

您没有使用 OpenPGM,您使用的是所谓的 ms-pgm(PGM 的 Microsoft 实现)。

无论如何,您可能必须更改套接字的 MulticastRate(默认为 100kbit/s)。

还有你用的是什么网络?

于 2014-08-09T16:34:05.097 回答
0

我遇到了同样的问题,发件人每秒可以发送数千条消息。但是我的接收器每秒只能接收两百条消息。

我认为这可能是发送或接收速率有限。我检查

ZMQ_RATE:在http://api.zeromq.org/3-0:zmq-setsockopt中设置多播数据速率

文件截图

默认速率仅为 100kb/s。

当我将其增加到 1Gb/s 时,现在一切正常。

const int rate = 1000000;                              // 1Gb TX- and RX- rate
m_socket.setsockopt(ZMQ_RATE, &rate, sizeof(rate));
于 2016-07-01T23:09:29.020 回答