我在与NetMQ进行 ROUTER-ROUTER 通信时遇到问题。
两个应用程序,客户端和服务器。两者都使用 ROUTER 套接字。客户端路由器套接字没有明确设置它的身份,只有服务器。
第一次建立连接后,消息从客户端(ROUTER)路由到服务器(ROUTER)。但是一旦客户端再次调用 Disconnect() 然后 Connect() (在很短的时间后,即 10 毫秒),服务器就不再接收消息了。
有趣的是,如果使用ZeroMq C# lib,则重新连接可以正常工作。
这是一个已知问题还是我在这里做错了什么?
重现问题的最少代码:
public class PureNetMQ
{
private static readonly byte[] ServerIdentity = {1, 2, 3};
private static readonly byte[] ClientIdentity = {1, 2, 4};
private static readonly string Address = "tcp://127.0.0.1:5000";
private static void Main(string[] args)
{
var cancellationTokenSource = new CancellationTokenSource();
Task.Factory.StartNew(_ => RunServer(cancellationTokenSource.Token), cancellationTokenSource.Token, TaskCreationOptions.LongRunning);
Thread.Sleep(TimeSpan.FromSeconds(2));
RunClient();
Console.WriteLine("Done");
Console.ReadLine();
cancellationTokenSource.Cancel(true);
}
private static void RunClient()
{
using (var context = NetMQContext.Create())
{
using (NetMQSocket socket = context.CreateRouterSocket())
{
//socket.Options.Identity = Guid.NewGuid().ToByteArray();
//socket.Options.RouterMandatory = true;
socket.Options.Linger = TimeSpan.Zero;
socket.Connect(Address);
Thread.Sleep(TimeSpan.FromSeconds(2));
socket.SendMore(ServerIdentity);
socket.SendMore(Encoding.UTF8.GetBytes(""));
socket.Send(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
Console.WriteLine("Message sent...");
socket.Disconnect(Address);
Thread.Sleep(TimeSpan.FromSeconds(5));
Console.WriteLine("Disconnected");
//socket.Options.Identity = Guid.NewGuid().ToByteArray();
socket.Connect(Address);
Thread.Sleep(TimeSpan.FromSeconds(5));
Console.WriteLine("Reconnected");
socket.SendMore(ServerIdentity);
socket.SendMore(Encoding.UTF8.GetBytes(""));
socket.Send(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()));
Console.WriteLine("Message sent...");
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
}
private static void RunServer(CancellationToken token)
{
using (var context = NetMQContext.Create())
{
using (var socket = context.CreateRouterSocket())
{
socket.Options.Identity = ServerIdentity;
//socket.Options.RouterMandatory = true;
socket.Options.Linger = TimeSpan.Zero;
socket.Bind(Address);
while (!token.IsCancellationRequested)
{
var message = socket.ReceiveMessage();
for (var i = 2; i < message.FrameCount; i++)
{
Console.WriteLine("Message: {0}", Encoding.UTF8.GetString(message[i].Buffer));
}
}
}
}
}
}