3

我们有一个充当服务器的 Java 应用程序。客户端应用程序(用 C# 编写)使用 ZeroMQ 与它通信。我们(大部分)遵循懒惰的海盗模式。

服务器有一个路由器套接字,实现如下(使用 JeroMQ):

ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.bind("tcp://*:5555");

客户端连接并发送如下消息:

ZContext context = ZContext.Create();
ZSocket socket = ZSocket.Create(context, ZSocketType.REQ);
socket.Identity = Encoding.UTF8.GetBytes("Some identity");
socket.Connect("tcp://my_host:5555");
socket.Send(new ZFrame("request data"));

当多个客户端同时发送消息时,我们会遇到消息丢失的情况。对于单个客户端,似乎没有任何问题。

我们是否以正确的方式实施多客户端单服务器设置?

更新:显示此行为的示例客户端和服务器:

服务器:

import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;

public class SimpleServer
{
    public static void main(String[] args) throws InterruptedException
    {
        ZContext context = new ZContext();
        Socket socket = context.createSocket(ZMQ.ROUTER);
        socket.setRouterMandatory(true);
        socket.bind("tcp://*:5559");

        PollItem pollItem = new PollItem(socket, Poller.POLLIN);

        int messagesReceived = 0;
        int pollCount = 0;

        while ((pollCount = ZMQ.poll(new PollItem[]{pollItem}, 3000)) > -1)
        {
            messagesReceived += pollCount;

            for (int i = 0 ; i < pollCount ; i++)
            {
                ZMsg msg = ZMsg.recvMsg(socket);
                System.out.println(String.format("Received message: %s. Total messages received: %d", msg, messagesReceived));
            }

            if (pollCount == 0)
            {
                System.out.println(String.format("No messages on socket. Total messages received: %d", messagesReceived));
            }
        }
    }
}

客户:

using NetMQ;
using System;
using System.Text;

namespace SimpleClient
{
    class Program
    {
        static byte[] identity = Encoding.UTF8.GetBytes("id" + DateTime.UtcNow.Ticks);

        static void Main(string[] args)
        {
            for (int i = 0; i < 100; i++)
            {
                SendMessage();
            }
        }

        private static void SendMessage()
        {
            using (NetMQContext context = NetMQContext.Create())
            {
                using (NetMQSocket socket = context.CreateRequestSocket())
                {
                    socket.Options.Identity = identity;
                    socket.Connect("tcp://localhost:5559");
                    socket.Send(Encoding.UTF8.GetBytes("hello!"));
                }
            }
        }
    }
}

如果我运行服务器和单个客户端,我可以看到所有 100 条消息都到达。如果我同时运行 5 个客户端,我只会收到大约 200 -> 300 条消息到达,而不是全部 500 条。顺便说一句,关闭客户端中的套接字似乎会以某种方式停止服务器上的路由器套接字简短地接收消息,尽管这只是一个理论。

4

3 回答 3

2

第 1 部分 - 民意调查可能会返回多个事件

ZMQ.poll()返回找到的事件数:

int rc = ZMQ.poll(new PollItem[]{pollItem}, 3000);

您目前假设一个返回poll是一个事件。相反,您应该遍历ZMsg msg = ZMsg.recvMsg(socket);返回的事件数ZMQ.Poll()

从 JeroMQ 的来源

/**
 * Polling on items. This has very poor performance.
 * Try to use zmq_poll with selector
 * CAUTION: This could be affected by jdk epoll bug
 *
 * @param items
 * @param timeout
 * @return number of events
 */
public static int zmq_poll(PollItem[] items, long timeout)
{
    return zmq_poll(items, items.length, timeout);
}

第 2 部分 - ZMsg.receive() 可能返回多个帧

当您收到ZMsgfromZMsg msg = ZMsg.recvMsg(socket);时,ZMsg可能包含多个ZFrames,每个都包含客户端数据。

ZMsgJeroMQ 源代码中的类的评论中:

 * // Receive message from ZMQSocket "input" socket object and iterate over frames
 * ZMsg receivedMessage = ZMsg.recvMsg(input);
 * for (ZFrame f : receivedMessage) {
 *     // Do something with frame f (of type ZFrame)
 * }

第 3 部分 - 消息可以跨多个 ZFrame 拆分

来自 JeroMQ 中 ZFrame 的源代码

 * The ZFrame class provides methods to send and receive single message
 * frames across 0MQ sockets. A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code.
 * When you read a frame from a socket, the more() method indicates if the frame is part of an
 * unfinished multipart message.

如果我理解正确,那么对于每个事件,您可能会得到多个帧,并且一个客户端消息可能会映射到 1..N 帧(如果消息很大?)。

所以总结一下:

  • 民意调查的一次返回可能表示多个事件。
  • 一个事件,因此一个事件ZMsg.receive()可能包含多个帧
  • 一帧可以包含一条完整的客户端消息或仅包含客户端消息的一部分;一个客户端消息映射到 1..N 帧。
于 2015-04-14T19:25:47.277 回答
2

不幸的是,我们无法解决这个特定的问题,并且已经不再使用 ZeroMQ 作为这个接口。如果它对其他人有帮助,我们唯一确定的事情是快速打开/关闭请求套接字会导致路由器套接字端出现不良行为(丢失消息)。性能不佳的服务器 CPU 加剧了该问题,并且当服务器在快速多核机器上时根本不会出现。

于 2015-04-22T17:14:22.940 回答
1

不幸的是,当这个问题出现时,我什至没有与 ZMQ 密切合作。但是我今天遇到了同样的问题并找到了这个页面。你的回答(不使用 ZMQ)对我来说并不令人满意。所以我搜索了更多,终于找到了该怎么做。

提醒一下:这适用于 ZMQ [1] 中的“POLLER”

如果您使用“PAIR”连接,您肯定不会丢失任何文件,发送/接收大约需要。同时。所以你不能加快速度,也不是我的解决方案。

解决方案:

  • 在 zmq_setsockopt (python: zmq.setsockopt) 中,您可以将 ZMQ_HWM (zmq.SNDHWM, zmq.RCVHWM) 设置为 '0' [2]

    • 在 python 中:sock.setsockopt(zmq.SNDHWM , 0) resp。sock.setsockopt(zmq.RCVHWM, 0)分别代表发件人。接收器

    • 注意:我认为符号从 HWM 更改为 SNDWHM/RCVHWM

    • HWM = 0 表示消息数量没有“限制”(所以要小心,可能设置一个(非常高的)限制)

  • 还有 ZMQ_SNDBUF/ ZMQ_RCVBUF (python: zmq.SNDBUF / zmq.RCVBUF ) 你也可以给出,即。sock.setsockopt(zmq.RCVBUF, 0)分别 ..... [2]

    • 所以这会将操作系统“SO_RCVBUF”设置为默认值(我的知识到此结束)

    • 是否设置此参数并没有影响我的情况,但我认为它可能

表现:

所以有了这个我可以在〜8s(〜10GB)内“发送”100'000个98kB的文件:这将填满你的RAM(如果这已满,我认为你的程序会变慢),另见图片

与此同时,我“接收”并将文件保存在大约 ~在此处输入图像描述118 秒并再次释放 RAM

另外,到目前为止,我从来没有丢失过一个文件。(如果您达到 PC 的极限,您可能会这样做)

数据丢失是“好”:

  • 如果您真的需要所有数据,则应使用此方法

  • 如果您认为一些损失很好(例如实时绘图:只要您的 FPS > ~50,您将顺利看到绘图并且您不在乎是否丢失某些东西)

  • --> 您可以节省 RAM 并避免阻塞您的整个 PC!

希望这篇文章对下一个过来的人有所帮助...

[1]:https ://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/multisocket/zmqpoller.htm

[2]:http ://api.zeromq.org/2-1:zmq-setsockopt

您会找到她的 RAM 的图片: RAM 大约在 8 秒内加载完毕。之后磁盘正在保存缓冲区中的文件

于 2019-08-22T07:31:08.070 回答