68

当结合 socket.io/node.js 和 redis pub/sub 试图创建一个由服务器事件驱动的可以处理多个传输的实时网络广播系统时,似乎有三种方法:

  1. 'createClient' 一个 redis 连接并订阅频道。在 socket.io 客户端连接上,将客户端加入到 socket.io 房间。在 redis.on("message", ...) 事件中,调用 io.sockets.in(room).emit("event", data) 分发给相关房间的所有客户端。比如如何在socket.io中重用redis连接?

  2. 'createClient' 一个 redis 连接。在 socket.io 客户端连接上,将客户端加入到 socket.io 房间并订阅相关的 redis 频道。在客户端连接关闭中包含 redis.on("message", ...) 并在收到消息调用 client.emit("event", data) 以在特定客户端上引发事件。就像在 socket.io 中使用 RedisStore 的示例中的答案一样

  3. 根据 socketio-spec 协议,使用 RedisStore 烘焙到 socket.io 中并从 Redis 中的单个“调度”通道“广播”。

数字 1 允许为所有客户端处理一次 Redis 子和关联事件。2 号提供了一个更直接的挂钩到 Redis 发布/订阅。数字 3 更简单,但对消息传递事件几乎没有控制。

但是,在我的测试中,所有连接的客户端都表现出出乎意料的低性能。有问题的服务器事件是尽快发布到 redis 通道的 1,000 条消息,以便尽快分发。性能是通过连接客户端的时间来衡量的(socket.io-client 基于将时间戳记录到 Redis 列表中进行分析)。

我推测在选项 1 中,服务器接收到消息,然后将其依次写入所有连接的客户端。在选项 2 中,服务器多次接收每条消息(每个客户端订阅一次)并将其写入相关客户端。在任何一种情况下,服务器都不会到达第二个消息事件,直到它与所有连接的客户端通信。随着并发性的增加,情况明显恶化。

这似乎与堆栈功能的感知智慧不一致。我想相信,但我很挣扎。

这种情况(大量消息的低延迟分布)是不是这些工具的选项(还没有?),还是我错过了一个技巧?

4

1 回答 1

30

我认为这是一个合理的问题,并在不久前对其进行了简短的研究。我花了一些时间搜索示例,您可以从中获得一些有用的提示。

例子

我喜欢从简单的例子开始:

轻量级示例是单页(请注意,您需要将 redis-node-client 替换为 Matt Ranney 的node_redis 之类的东西:

/*
 * Mclarens Bar: Redis based Instant Messaging
 * Nikhil Marathe - 22/04/2010

 * A simple example of an IM client implemented using
 * Redis PUB/SUB commands so that all the communication
 * is offloaded to Redis, and the node.js code only
 * handles command interpretation,presentation and subscribing.
 * 
 * Requires redis-node-client and a recent version of Redis
 *    http://code.google.com/p/redis
 *    http://github.com/fictorial/redis-node-client
 *
 * Start the server then telnet to port 8000
 * Register with NICK <nick>, use WHO to see others
 * Use TALKTO <nick> to initiate a chat. Send a message
 * using MSG <nick> <msg>. Note its important to do a
 * TALKTO so that both sides are listening. Use STOP <nick>
 * to stop talking to someone, and QUIT to exit.
 *
 * This code is in the public domain.
 */
var redis = require('./redis-node-client/lib/redis-client');

var sys = require('sys');
var net = require('net');

var server = net.createServer(function(stream) {
    var sub; // redis connection
    var pub;
    var registered = false;
    var nick = "";

    function channel(a,b) {
    return [a,b].sort().join(':');
    }

    function shareTable(other) {
    sys.debug(nick + ": Subscribing to "+channel(nick,other));
    sub.subscribeTo(channel(nick,other), function(channel, message) {
        var str = message.toString();
        var sender = str.slice(0, str.indexOf(':'));
        if( sender != nick )
        stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "\n");
    });
    }

    function leaveTable(other) {
    sub.unsubscribeFrom(channel(nick,other), function(err) {
        stream.write("Stopped talking to " + other+ "\n");
    });
    }

    stream.addListener("connect", function() {
    sub = redis.createClient();
    pub = redis.createClient();
    });

    stream.addListener("data", function(data) {
    if( !registered ) {
        var msg = data.toString().match(/^NICK (\w*)/);
        if(msg) {
        stream.write("SERVER: Hi " + msg[1] + "\n");
        pub.sadd('mclarens:inside', msg[1], function(err) {
            if(err) {
            stream.end();
            }
            registered = true;
            nick = msg[1];
// server messages
            sub.subscribeTo( nick + ":info", function(nick, message) {
            var m = message.toString().split(' ');
            var cmd = m[0];
            var who = m[1];
            if( cmd == "start" ) {
                stream.write( who + " is now talking to you\n");
                shareTable(who);
            }
            else if( cmd == "stop" ) {
                stream.write( who + " stopped talking to you\n");
                leaveTable(who);
            }
            });
        });
        }
        else {
        stream.write("Please register with NICK <nickname>\n");
        }
        return;
    }

    var fragments = data.toString().replace('\r\n', '').split(' ');
    switch(fragments[0]) {
    case 'TALKTO':
        pub.publish(fragments[1]+":info", "start " + nick, function(a,b) {
        });
        shareTable(fragments[1]);
        break;
    case 'MSG':
        pub.publish(channel(nick, fragments[1]),
            nick + ':' +fragments.slice(2).join(' '),
              function(err, reply) {
              if(err) {
                  stream.write("ERROR!");
              }
              });
        break;
    case 'WHO':
        pub.smembers('mclarens:inside', function(err, users) {
        stream.write("Online:\n" + users.join('\n') + "\n");
        });
        break;
    case 'STOP':
        leaveTable(fragments[1]);
        pub.publish(fragments[1]+":info", "stop " + nick, function() {});
        break;
    case 'QUIT':
        stream.end();
        break;
    }
    });

    stream.addListener("end", function() {
    pub.publish(nick, nick + " is offline");
    pub.srem('mclarens:inside', nick, function(err) {
        if(err) {
        sys.debug("Could not remove client");
        }
    });
    });
});

server.listen(8000, "localhost");

文件

那里有大量文档,并且 api 在这种类型的堆栈上迅速变化,因此您必须权衡每个文档的时间相关性。

相关问题

只是几个相关的问题,这是堆栈上的热门话题:

值得注意的提示(ymmv)

关闭或优化套接字池,使用有效的绑定,监控延迟,并确保您没有重复工作(即无需向所有侦听器发布两次)。

于 2012-06-13T21:31:51.190 回答