-1

我在 ZeroMQ 上玩了一段时间,并提出了几个问题/问题。如果 ZeroMQ 的任何贡献者可以加入或任何使用或当前使用该库的人将不胜感激。

*假设我有一个路由器/转发器和 2 个不同的客户端(c1,c2)。我想通过路由设备将消息从 client1 推送到 client2。路由器从任何客户端(此处为 client1)提取消息并将它们发布到任何订阅的客户端(此处为 client2)。我目前将此类消息路由到相应客户端的唯一方法是通过 pub/sub,但是,a)我想通过发送 routingTo 标记以及消息正文来决定如何在运行时路由,b)我想使用 push /pull 转发给客户端,而不是发布/订阅,因为我想在设置高水位标记属性时实现阻塞功能,c)我想让 c1 和 c2 连接到 1 个端口用于推送和 1 个端口用于订阅。我能否以某种方式在路由器端进行更改以便不必使用 pub/sub 或者 pub/sub 是路由到客户端的唯一方法,即使我知道在路由端应该将消息转发到哪里?我读到当队列大小超过我不想要的 hwm 时,pub/sub 会丢弃消息。我也不想实现请求/回复模式,因为它增加了不必要的开销,因为我不需要回复。

*在运行以下代码(Push/Pull -> Pub/Sub)并发送所有消息并收到确认所有消息已收到后,推送消息的客户端仍然显示巨大的内存占用,显然仍然有大量消息在Push 套接字的队列。为什么会这样,我能做些什么来解决这个问题?

这是我的代码:

路由器:

class Program
{
    static void Main(string[] args)
    {
        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.PULL), socketOut = context.Socket(SocketType.XPUB))
            {
                socketIn.HWM = 10000;
                socketOut.Bind("tcp://*:5560"); //forwards on this port
                socketIn.Bind("tcp://*:5559"); //listens on this port

                Console.WriteLine("Router started and running...");

                while (true)
                {
                    //Receive Message
                    byte[] address = socketIn.Recv();
                    byte[] body = socketIn.Recv();

                    //Forward Message
                    socketOut.SendMore(address);
                    socketOut.Send(body);
                }
            }
        }
    }
}

客户1:

class Program
{
    static void Main(string[] args)
    {
        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.SUB), socketOut= context.Socket(SocketType.PUSH))
            {
                byte[] iAM = Encoding.Unicode.GetBytes("Client1");
                byte[] youAre = Encoding.Unicode.GetBytes("Client2");
                byte[] msgBody = new byte[16];

                socketOut.HWM = 10000;
                socketOut.Connect("tcp://localhost:5559");
                socketIn.Connect("tcp://localhost:5560");
                socketIn.Subscribe(iAM);

                Console.WriteLine("Press key to kick off Test Client1 Sending Routine");
                Console.ReadLine();

                for (int counter = 1; counter <= 10000000; counter++)
                {
                    //Send Message
                    socketOut.SendMore(youAre);
                    socketOut.Send(msgBody);
                }

                Console.WriteLine("Client1: Finished Sending");
                Console.ReadLine();
            }
        }
    }
}

客户2:

class Program
{
    public static int msgCounter;

    static void Main(string[] args)
    {
        msgCounter = 0;

        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.SUB), socketOut = context.Socket(SocketType.PUSH))
            {
                byte[] iAM = Encoding.Unicode.GetBytes("Client2");

                socketOut.Connect("tcp://localhost:5559");
                socketIn.Connect("tcp://localhost:5560");
                socketIn.Subscribe(iAM);

                Console.WriteLine("Client2: Started Listening");

                //Receive First Message
                byte[] address = socketIn.Recv();
                byte[] body = socketIn.Recv();
                msgCounter += 1;

                Console.WriteLine("Received first message");

                Stopwatch watch = new Stopwatch();
                watch.Start();

                while (msgCounter < 10000000)
                {
                    //Receive Message
                    address = socketIn.Recv();
                    body = socketIn.Recv();
                    msgCounter += 1;
                }

                watch.Stop();
                Console.WriteLine("Elapsed Time: " + watch.ElapsedMilliseconds + "ms");
                Console.ReadLine();
            }
        }
    }
}
4

1 回答 1

2

我将建议您的架构可能有点偏离这里。

1) 如果您需要一个 PUSH 和一个 PULL,请从中间移除设备。设备被显式添加到架构中以管理多个消费者,这样您就不必在每次添加节点时更新生产者。当/如果您确实到达需要多个消费者和/或生产者的地方,您将需要连接到设备上的每个节点——这就是它们的工作方式。在这种情况下,听起来该设备使您的解决方案过于复杂。

2)拥有“路线”标签的想法真的让我大吃一惊。选择消息传递而不是其他集成选项的最大原因可能是将生产者和消费者解耦,这样任何一方都不必知道对方的任何信息(除了在无代理设计的情况下将消息发送到哪里)。将路由信息直接添加到您的逻辑中会破坏这一点。

至于开销,我从未经历过。但是,我以前从未使用过 ZeroMQ 的 .Net 驱动程序,因此未受过教育的猜测是查看 .Net 驱动程序本身。

于 2012-04-22T18:23:22.530 回答