2

我有一个问题我设法解决了......但是我有点担心,因为我不太明白为什么解决方案有效;

我正在使用 NetMQ,特别是具有多个套接字的 NetMQ 轮询器,其中一个是 REQ-REP 对。

我有一个请求队列,这些请求队列被排成请求,服务器根据需要处理每种请求类型并发回适当的响应。这一直没有问题,但是当我尝试添加其他请求类型时,系统按预期停止工作;会发生的是请求将到达服务器,服务器将发送响应......而客户端将不会收到它。在服务器关闭之前,客户端不会收到消息(异常行为!)。

我一直在使用在发送请求之前设置的标志来管理 REQ-REP 对,并在收到回复后重置。我设法通过仅在 REQ 套接字的“SendReady”事件中触发回复来解决这个问题——这自动修复了我的所有问题,但是我在文档中找不到任何东西告诉我为什么套接字可能没有处于“发送就绪”状态,或者这实际上是做什么的。

任何可以解释为什么现在有效的信息都会很棒:)

干杯。

编辑:来源

客户:

“订阅”作为 UI 的单独线程运行

    private void Subscribe(string address)
    {
        using (var req = new RequestSocket(address + ":5555"))
        using (var sub = new SubscriberSocket(address + ":5556"))
        using (var poller = new NetMQPoller { req, sub })
        {
            // Send program code when a request for a code update is received
            sub.ReceiveReady += (s, a) =>
            {
                var type = sub.ReceiveFrameString();
                var reply = sub.ReceiveFrameString();

                switch (type)
                {
                    case "Type1":
                        manager.ChangeValue(reply);
                        break;

                    case "Type2":
                        string[] args = reply.Split(',');
                        eventAggregator.PublishOnUIThread(new MyEvent(args[0], (SimObjectActionEventType)Enum.Parse(typeof(MyEventType), args[1])));
                        break;
                }
            };

            req.ReceiveReady += Req_ReceiveReady;

            poller.RunAsync();

            sub.Connect(address + ":5556");
            sub.SubscribeToAnyTopic();
            sub.Options.ReceiveHighWatermark = 10;

            reqQueue = new Queue<string[]>();

            reqQueue.Enqueue(new string[] { "InitialiseClient", "" });

            req_sending = false;

            while (programRunning)
            {
                if (reqQueue.Count > 0 && !req_sending)
                {
                    req_sending = true;
                    string[] request = reqQueue.Dequeue();
                    Console.WriteLine("Sending " + request[0] + " " + request[1]);
                    req.SendMoreFrame(request[0]).SendFrame(request[1]);
                }

                Thread.Sleep(1);
            }
        }
    }

    private void Req_ReceiveReady(object sender, NetMQSocketEventArgs e)
    {
        var req = e.Socket;

        var messageType = req.ReceiveFrameString();

        Console.WriteLine("Received {0}", messageType);

        switch (messageType)
        {
            case "Reply1":
                // Receive action

                break;

            case "Reply2":
                // Receive action

                break;

            case "Reply3":
                // Receive action

                break;

        }

        req_sending = false;
    }

服务器:

        using (var rep = new ResponseSocket("@tcp://*:5555"))
        using (var pub = new PublisherSocket("@tcp://*:5556"))
        using (var beacon = new NetMQBeacon())
        using (var poller = new NetMQPoller { rep, pub, beacon })
        {
            // Send program code when a request for a code update is received
            rep.ReceiveReady += (s, a) =>
            {
                var messageType = rep.ReceiveFrameString();
                var message = rep.ReceiveFrameString();

                Console.WriteLine("Received {0} - Content: {1}", messageType, message);

                switch (messageType)
                {
                    case "InitialiseClient":
                        // Send
                        rep.SendMoreFrame("Reply1").SendFrame(repData);
                        break;

                    case "Req2":
                        // do something
                            rep.SendMoreFrame("Reply2").SendFrame("RequestOK");

                        break;

                    case "Req3":
                        args = message.Split(',');

                        if (args.Length == 2)
                        {
                            // Do Something

                            rep.SendMoreFrame("Reply3").SendFrame("RequestOK");
                        }
                        else
                        {
                            rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
                        }

                        break;

                    case "Req4":
                        args = message.Split(',');

                        if (args.Length == 2)
                        {

                            requestData = //do something

                            rep.SendMoreFrame("Reply4").SendFrame(requestData);
                        }
                        else
                        {
                            rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
                        }

                        break;

                    default:
                        rep.SendMoreFrame("Ack").SendFrame("Error");
                        break;
                }
            };

            // setup discovery beacon with 1 second interval
            beacon.Configure(5555);
            beacon.Publish("server", TimeSpan.FromSeconds(1));

            // start the poller
            poller.RunAsync();

            // run the simulation loop
            while (serverRunning)
            {
                // todo - make this operate for efficiently
                // push updated variable values to clients
                foreach (string[] message in pubQueue)
                {
                    pub.SendMoreFrame(message[0]).SendFrame(message[1]);
                }

                pubQueue.Clear();

                Thread.Sleep(2);
            }

            poller.StopAsync();
        }
4

2 回答 2

4

您正在使用来自多个线程的请求套接字,这是不受支持的。您在主线程上发送并在轮询线程上接收。

尝试使用 NetMQQueue,而不是使用常规队列,您可以将其添加到轮询器并从 UI 线程入队。然后发送发生在轮询线程以及接收上。

您可以在此处阅读文档:http: //netmq.readthedocs.io/en/latest/queue/

于 2016-06-17T14:57:21.990 回答
0

我能想到的唯一一件事是,只有在您实际完全收到消息(所有部分)后,REP 套接字才准备好发送。

于 2016-06-14T07:48:01.337 回答