7

我有多个使用 redisstore 水平扩展的 socket.io 服务器。我已经有效地设置了房间,并且能够成功地跨服务器广播到房间等。现在我正在尝试构建一个状态页面,而我未能弄清楚的是如何简单地计算连接的用户数量所有服务器。

io.sockets.clients('room') 和 io.sockets.sockets 只会告诉你那台服务器上连接的客户端数量,而不是所有连接到同一个 RedisStore 的服务器。

建议?

谢谢。

4

4 回答 4

3

当用户连接到聊天室时,您可以在 RedisStore 中自动增加用户计数器。当用户断开连接时,您会减小该值。通过这种方式,Redis 维护了用户数量并且可供所有服务器访问。

INCRDECR

SET userCount = "0"

当用户连接时:

INCR userCount

当用户断开连接时:

DECR userCount
于 2012-08-28T17:58:09.623 回答
3

这是我使用 Redis 脚本解决它的方法。它需要 2.6 或更高版本,因此现在很可能仍需要编译您自己的实例。

每次进程启动时,我都会生成一个新的 UUID 并将其保留在全局范围内。我可以使用 pid,但这感觉更安全一些。

# Pardon my coffeescript
processId = require('node-uuid').v4()

当用户连接时(socket.io 连接事件),然后我将该用户的 id 推送到基于该 processId 的用户列表中。我还将该密钥的到期时间设置为 30 秒。

RedisClient.lpush "process:#{processId}", user._id
RedisClient.expire "process:#{processId}", 30

当用户断开连接(断开连接事件)时,我将其删除并更新到期时间。

RedisClient.lrem "process:#{processId}", 1, user._id
RedisClient.expire "process:#{processId}", 30

我还设置了一个以 30 秒间隔运行的函数,以基本上“ping”该键,使其保持在那里。因此,如果进程确实意外终止,所有这些用户会话将基本上消失。

setInterval ->
  RedisClient.expire "process:#{processId}", 30
, 30 * 1000

现在来说说魔法。Redis 2.6 包含 LUA 脚本,它本质上提供了一种存储过程的功能。它真的很快而且不是非常密集的处理器(他们将其与“几乎”运行的 C 代码进行比较)。

我的存储过程基本上循环遍历所有进程列表,并创建一个 user:user_id 键及其当前登录总数。这意味着如果他们使用两个浏览器等登录,它仍然允许我使用逻辑来判断他们是完全断开连接,还是只是其中一个会话。

我在所有进程上每 15 秒运行一次这个函数,在连接/断开事件之后也是如此。这意味着我的用户计数很可能会精确到秒,并且不会超过 15 到 30 秒。

生成该 redis 函数的代码如下所示:

def = require("promised-io/promise").Deferred

reconcileSha = ->
  reconcileFunction = "
    local keys_to_remove = redis.call('KEYS', 'user:*')
    for i=1, #keys_to_remove do
      redis.call('DEL', keys_to_remove[i])
    end

    local processes = redis.call('KEYS', 'process:*')
    for i=1, #processes do
      local users_in_process = redis.call('LRANGE', processes[i], 0, -1)
      for j=1, #users_in_process do
        redis.call('INCR', 'user:' .. users_in_process[j])
      end
    end
  "

  dfd = new def()
  RedisClient.script 'load', reconcileFunction, (err, res) ->
    dfd.resolve(res)
  dfd.promise

然后我可以稍后在我的脚本中使用它:

reconcileSha().then (sha) ->
  RedisClient.evalsha sha, 0, (err, res) ->
    # do stuff

我做的最后一件事是尝试处理一些关闭事件,以确保进程尝试最好不要依赖 redis 超时并真正优雅地关闭。

gracefulShutdown = (callback) ->
  console.log "shutdown"
  reconcileSha().then (sha) ->
    RedisClient.del("process:#{processId}")
    RedisClient.evalsha sha, 0, (err, res) ->
      callback() if callback?

# For ctrl-c
process.once 'SIGINT', ->
  gracefulShutdown ->
    process.kill(process.pid, 'SIGINT')

# For nodemon
process.once 'SIGUSR2', ->
  gracefulShutdown ->
    process.kill(process.pid, 'SIGUSR2')

到目前为止,它运行良好。

我仍然想做的一件事是让 redis 函数返回任何已更改其值的键。这样,如果特定用户的计数发生了变化,而没有任何服务器主动知道(例如进程死亡),我实际上可以发送一个事件。现在,我必须依靠再次轮询 user:* 值才能知道它已更改。它有效,但它可能会更好......

于 2013-04-16T12:52:39.287 回答
1

我通过让每台服务器定期在 redis 中设置一个用户计数来解决这个问题,其中包含他们自己的 pid:

每做setex userCount:<pid> <interval+10> <count>

然后状态服务器可以查询每个键,然后获取每个键的值:

对于每个keys userCount*做总+=get <key>

因此,如果服务器崩溃或关闭,那么它的计数将在间隔+10 后从 redis 中退出

对丑陋的伪代码感到抱歉。:)

于 2012-09-04T15:58:44.053 回答
0

您可以使用哈希键来存储值。

当用户连接到服务器 1 时,您可以在名为“userCounts”的键上设置一个名为“srv1”的字段。只需将值覆盖为当前使用HSET的计数。无需增加/减少。只需设置 socket.io 已知的当前值。

HSET userCounts srv1 "5"

当另一个用户连接到不同的服务器时,设置不同的字段。

HSET userCounts srv2 "10"

然后任何服务器都可以通过返回“userCounts”中的所有字段并使用HVALS将它们相加以返回值列表来获得总数。

HVALS userCounts

当服务器崩溃时,您需要运行一个脚本来响应崩溃,将该服务器的字段从 userCounts 或 HSET 中删除为“0”。

您可以查看Forever以自动重新启动服务器。

于 2012-08-28T21:28:49.043 回答