3

我创建了 2 个简单的 C# 控制台项目(.net 4.5.2),向每个项目添加了 v4.0.0.1 NetMQ Nuget 包,将每个程序加载到单独的 Visual Studio 2017 社区版中,在包含在OnReceiveReady 回调方法,先启动订阅者程序,再启动发布者程序。订阅者中未触发 ReceieveReady 事件。我究竟做错了什么?即使我选择了 subSocket.Subscribe("") ,我仍然没有收到任何消息。此外,删除/修改 Send/Receive HighWatermarks 也没有改变。谢谢你的帮助!

这是发布者代码:

using System;
using NetMQ;
using NetMQ.Sockets;
using System.Threading;

namespace SampleNQPub
{
    class Program
    {
        static void Main(string[] args)
        {
            var addr = "tcp://127.0.0.1:3004";

            using (var pubSocket = new PublisherSocket())
            {
                Console.WriteLine("Publisher socket binding.");
                pubSocket.Options.SendHighWatermark = 10;
                pubSocket.Bind(addr);

                for (int i=0; i < 30; i++)
                {
                    pubSocket.SendMoreFrame("NQ").SendFrame(i.ToString());
                    Thread.Sleep(1000);
                }

                pubSocket.Disconnect(addr);
            }
        }
    }
}

这是订阅者代码:

using System.Threading;
using NetMQ;
using NetMQ.Sockets;

namespace SampleNQSub
{
    class Program
    {
        static void Main(string[] args)
        {
            var addr = "tcp://127.0.0.1:3004";

            using (var subSocket = new SubscriberSocket())
            {
                subSocket.ReceiveReady += OnReceiveReady;
                subSocket.Options.ReceiveHighWatermark = 10;
                subSocket.Connect(addr);
                subSocket.Subscribe("NQ");

                for (int i=0; i < 20; i++)
                {
                    Thread.Sleep(1000);
                }

                subSocket.Disconnect(addr);
            }
        }

        static void OnReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var str = e.Socket.ReceiveFrameString();
        }
    }
}
4

1 回答 1

2

好的,这是 NetMQ 世界中的一个难题,我刚刚想通了。您必须设置一个 NetMQPoller,它最终会调用您添加到其中的每个 ReceiveReady 回调 (NetMQPoller)。

这是至少(即 ReceiveFrameString 仍然只获得“NQ”部分,但这只是另一个要修复的方法调用)触发 ReceiveReady 事件的更正代码:

using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace SampleNQSub
{
    class Program
    {
        static void Main(string[] args)
        {
            var addr = "tcp://127.0.0.1:3004";

            NetMQPoller poller = new NetMQPoller();

            using (var subSocket = new SubscriberSocket())
            {
                subSocket.ReceiveReady += OnReceiveReady;
                subSocket.Options.ReceiveHighWatermark = 10;
                subSocket.Connect(addr);
                subSocket.Subscribe("NQ");

                poller.Add(subSocket);
                poller.RunAsync();

                for (int i = 0; i < 20; i++)
                {
                    Thread.Sleep(1000);
                }

                subSocket.Disconnect(addr);
            }
        }

        static void OnReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var str = e.Socket.ReceiveFrameString();
            e.Socket.ReceiveMultipartStrings();
        }
    }
}

我注意到 NetMQ 的作者决定在 4.x 中在内部处理 Context 对象,这样用户就不必承担管理它的负担。如果他们也可以对用户隐藏这个“轮询泵”代码以及最简单的用例,那就太好了。

作为比较,使用我在上面发布的 Publisher 控制台应用程序查看使用 NodeJS(带有 zmq 库)的订阅者(将此代码保存到 sub.js,并在 Windows 控制台中,键入“node sub.js”):

var zmq = require('zmq'), sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:3004');
sock.subscribe('NQ');
console.log('Subscriber connected to port 3004');

sock.on('message', function() {
    var msg = [];
    Array.prototype.slice.call(arguments).forEach(function(arg) {
        msg.push(arg.toString());
    });

    console.log(msg);
});

那么轮询泵机制在哪里呢?(回答:我不在乎!我只希望在我注册的回调中提供给我的消息。[显然,开玩笑。我知道 NetMQPoller 是通用的,可以处理更复杂的问题,但对于基本的“当它到达时在回调中给我一条消息”,如果它由库内部处理会很好。])

于 2017-06-23T19:18:12.367 回答