1

我在NetMQ v4中有一个经销商 <--> 路由器设置,我可以在任何方向异步发送和接收消息,没有问题。

我现在想将其形式化为服务器(路由器)侦听任何传入消息的抽象,但它还需要按需向任何连接的客户端(经销商)广播消息。

我试图避免使用 Pub <--> Sub 套接字,因为我需要订阅者也向服务器发送消息。与我试图实现的最接近的模式是 WebSocket 客户端-服务器通信。

侦听客户端消息的第一部分是通过以下方式完成的:

using (var server = new RouterSocket("@tcp://*:80"))
{
    var addresses = new HashSet<string>();
    while (true)
    {
        var msg = server.ReceiveMultipartMessage();

        var address = Encoding.UTF8.GetString(msg[0].Buffer);
        var payload = Encoding.UTF8.GetString(msg[2].Buffer);
        Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);

        var contains = addresses.Contains(address);
        if (!contains) { addresses.Add(address); }            

        msg.Clear();
        msg.Append(address);
        msg.AppendEmptyFrame();
        msg.Append("Reply for: " + address);
        server.SendMultipartMessage(msg);
    }
}

现在鉴于套接字不是线程安全的,我一直在寻找一种方法来向所有客户端广播消息(根据需要来自不同的线程)。

我可能可以TryReceiveMultipartMessage在循环中使用该方法,而不是设置超时,之后我可以检查队列中是否有任何广播消息,然后遍历每个发送此类消息的客户端。就像是:

using (var server = new RouterSocket("@tcp://*:80"))
{
    var addresses = new HashSet<string>();

    var msg = new NetMQMessage();
    while (true)
    {
        var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg);
        if (!clientHasMsg)
        {
            // Check any incoming broacast then loop through all the clients
            // sending each the brodcast msg
            var broadMsg = new NetMQMessage();
            foreach (var item in addresses)
            {
                broadMsg.Append(item);
                broadMsg.AppendEmptyFrame();
                broadMsg.Append("This is a broadcast");
                server.SendMultipartMessage(broadMsg);
                broadMsg.Clear();
            }

            // Go back into the loop waiting for client messages
            continue;
        }

        var address = Encoding.UTF8.GetString(msg[0].Buffer);
        var payload = Encoding.UTF8.GetString(msg[2].Buffer);
        Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);

        var contains = addresses.Contains(address);
        if (!contains) { addresses.Add(address); }

        msg.Clear();
        msg.Append(address);
        msg.AppendEmptyFrame();
        msg.Append("Reply for: " + address);
        server.SendMultipartMessage(msg);
    }
}

这在某种程度上感觉不对,主要是因为:

  • 超时的什么值是一个好的值?1 秒、100 毫秒等;
  • 这是最有效/性能最好的解决方案吗,因为该程序将用于连接 10 万多个客户端,每个客户端每秒发送数千条消息。

任何关于什么是最好的方法的指针都非常感谢。

4

2 回答 2

2

您可以使用 netmqqueue,它是多生产者单消费者队列。您可以将其添加到 NetMQPoller 并从多个线程中排队,而无需锁定。

于 2016-10-11T19:58:13.547 回答
0

我认为 PUB/SUB 是满足 100k+ 客户需求的合适方法。然而,这并不意味着您不能与服务器通信:使用 DEALER/ROUTER。为什么你认为这个解决方案是不可接受的?

于 2016-10-11T16:31:17.010 回答