我在 ZeroMQ 上玩了一段时间,并提出了几个问题/问题。如果 ZeroMQ 的任何贡献者可以加入或任何使用或当前使用该库的人将不胜感激。
*假设我有一个路由器/转发器和 2 个不同的客户端(c1,c2)。我想通过路由设备将消息从 client1 推送到 client2。路由器从任何客户端(此处为 client1)提取消息并将它们发布到任何订阅的客户端(此处为 client2)。我目前将此类消息路由到相应客户端的唯一方法是通过 pub/sub,但是,a)我想通过发送 routingTo 标记以及消息正文来决定如何在运行时路由,b)我想使用 push /pull 转发给客户端,而不是发布/订阅,因为我想在设置高水位标记属性时实现阻塞功能,c)我想让 c1 和 c2 连接到 1 个端口用于推送和 1 个端口用于订阅。我能否以某种方式在路由器端进行更改以便不必使用 pub/sub 或者 pub/sub 是路由到客户端的唯一方法,即使我知道在路由端应该将消息转发到哪里?我读到当队列大小超过我不想要的 hwm 时,pub/sub 会丢弃消息。我也不想实现请求/回复模式,因为它增加了不必要的开销,因为我不需要回复。
*在运行以下代码(Push/Pull -> Pub/Sub)并发送所有消息并收到确认所有消息已收到后,推送消息的客户端仍然显示巨大的内存占用,显然仍然有大量消息在Push 套接字的队列。为什么会这样,我能做些什么来解决这个问题?
这是我的代码:
路由器:
class Program
{
static void Main(string[] args)
{
using (var context = new Context(1))
{
using (Socket socketIn = context.Socket(SocketType.PULL), socketOut = context.Socket(SocketType.XPUB))
{
socketIn.HWM = 10000;
socketOut.Bind("tcp://*:5560"); //forwards on this port
socketIn.Bind("tcp://*:5559"); //listens on this port
Console.WriteLine("Router started and running...");
while (true)
{
//Receive Message
byte[] address = socketIn.Recv();
byte[] body = socketIn.Recv();
//Forward Message
socketOut.SendMore(address);
socketOut.Send(body);
}
}
}
}
}
客户1:
class Program
{
static void Main(string[] args)
{
using (var context = new Context(1))
{
using (Socket socketIn = context.Socket(SocketType.SUB), socketOut= context.Socket(SocketType.PUSH))
{
byte[] iAM = Encoding.Unicode.GetBytes("Client1");
byte[] youAre = Encoding.Unicode.GetBytes("Client2");
byte[] msgBody = new byte[16];
socketOut.HWM = 10000;
socketOut.Connect("tcp://localhost:5559");
socketIn.Connect("tcp://localhost:5560");
socketIn.Subscribe(iAM);
Console.WriteLine("Press key to kick off Test Client1 Sending Routine");
Console.ReadLine();
for (int counter = 1; counter <= 10000000; counter++)
{
//Send Message
socketOut.SendMore(youAre);
socketOut.Send(msgBody);
}
Console.WriteLine("Client1: Finished Sending");
Console.ReadLine();
}
}
}
}
客户2:
class Program
{
public static int msgCounter;
static void Main(string[] args)
{
msgCounter = 0;
using (var context = new Context(1))
{
using (Socket socketIn = context.Socket(SocketType.SUB), socketOut = context.Socket(SocketType.PUSH))
{
byte[] iAM = Encoding.Unicode.GetBytes("Client2");
socketOut.Connect("tcp://localhost:5559");
socketIn.Connect("tcp://localhost:5560");
socketIn.Subscribe(iAM);
Console.WriteLine("Client2: Started Listening");
//Receive First Message
byte[] address = socketIn.Recv();
byte[] body = socketIn.Recv();
msgCounter += 1;
Console.WriteLine("Received first message");
Stopwatch watch = new Stopwatch();
watch.Start();
while (msgCounter < 10000000)
{
//Receive Message
address = socketIn.Recv();
body = socketIn.Recv();
msgCounter += 1;
}
watch.Stop();
Console.WriteLine("Elapsed Time: " + watch.ElapsedMilliseconds + "ms");
Console.ReadLine();
}
}
}
}