1

我在https://netmq.readthedocs.io/上采用了一个简单的接收/请求套接字示例,并希望使其在无限循环中与 parametrizedThread 一起工作。该代码在几个循环中运行良好,之后它会抛出

无法立即完成非阻塞套接字操作

对于我得到的上述内容,应该在第一个循环之后立即发生,而不是随机发生。这里有什么问题?听起来必须清除某些东西才能再次获得干净的连接(不确定)。

    class Program
{
    public class Connector
    {
        public String connection { get; set; }
        public ResponseSocket server { get; set; }

        public Connector(string address, ResponseSocket server_)
        {
            this.connection = address;
            this.server = server_;
        }
    }

    static void Main(string[] args)
    {
        string connection = "tcp://localhost:5555";
        using (var server = new ResponseSocket())
        {
            while (true)
            {
                try
                {
                    server.Bind(connection);
                }
                catch (NetMQException e)
                {
                    Console.WriteLine(e.ErrorCode);
                }

                Connector c = new Connector(connection, server);

                ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
                Thread t = new Thread(parametrizedClientThread);
                t.Start(c);
                //runClientSide(connection, server);
            }
        }
    }

    private static void runClientSide(object param)
    {
        Connector conn = (Connector)param;
        string connection = conn.connection;
        ResponseSocket server = conn.server;
        using (var client = new RequestSocket())
        {
            client.Connect(connection);
            client.SendFrame("Hello");

            string fromClientMessage = server.ReceiveFrameString();
            Console.WriteLine("From Client: {0}", fromClientMessage);
            server.SendFrame("Hi Back");

            string fromServerMessage = client.ReceiveFrameString();
            Console.WriteLine("From Server: {0}", fromServerMessage);

            //Console.ReadLine();
        }
    }
4

1 回答 1

2

NetMQSockets 不是线程安全的,您正在从客户端线程内部访问服务器以发送/接收数据。无论如何,客户端都不应该访问服务器套接字。

首先将 Bind 移到 while 循环之外,它只需要一次,而不是创建的每个客户端。要等待消息,请使用NetMQPoller,它会为您处理所有其他事情,并在收到消息后引发服务器 ReceiveReady 事件。

static void Main(string[] args) {
    string connection = "tcp://localhost:5555";
    using (var poller = new NetMQPoller()) {
        using (var server = new ResponseSocket()) {
            server.ReceiveReady += Server_ReceiveReady;
            poller.Add(server);
            poller.RunAsync();

            server.Bind(connection);

            // start 10000 clients
            for(int i = 0; i < 10000; i++) {

                ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
                Thread t = new Thread(parametrizedClientThread);
                t.Start(connection);
            }

            Console.ReadLine(); //let server run until user pressed Enter key
        }
    }
}

//server (e.Socket) is receiving data here and can answer it
private static void Server_ReceiveReady(object sender, NetMQSocketEventArgs e) {
    string fromClientMessage = e.Socket.ReceiveFrameString();
    Console.WriteLine("From Client: {0}", fromClientMessage);
    e.Socket.SendFrame("Hi Back");
}

private static void runClientSide(object param) {
    string connection = (string) param;

    using (var client = new RequestSocket()) {
        client.Connect(connection);
        client.SendFrame("Hello");

        //Removed server side code here and put it into ReceiveReady event

        string fromServerMessage = client.ReceiveFrameString();
        Console.WriteLine("From Server: {0}", fromServerMessage);
    }
}
于 2017-01-18T21:47:15.487 回答