我有多个使用 redisstore 水平扩展的 socket.io 服务器。我已经有效地设置了房间,并且能够成功地跨服务器广播到房间等。现在我正在尝试构建一个状态页面,而我未能弄清楚的是如何简单地计算连接的用户数量所有服务器。
io.sockets.clients('room') 和 io.sockets.sockets 只会告诉你那台服务器上连接的客户端数量,而不是所有连接到同一个 RedisStore 的服务器。
建议?
谢谢。
我有多个使用 redisstore 水平扩展的 socket.io 服务器。我已经有效地设置了房间,并且能够成功地跨服务器广播到房间等。现在我正在尝试构建一个状态页面,而我未能弄清楚的是如何简单地计算连接的用户数量所有服务器。
io.sockets.clients('room') 和 io.sockets.sockets 只会告诉你那台服务器上连接的客户端数量,而不是所有连接到同一个 RedisStore 的服务器。
建议?
谢谢。
这是我使用 Redis 脚本解决它的方法。它需要 2.6 或更高版本,因此现在很可能仍需要编译您自己的实例。
每次进程启动时,我都会生成一个新的 UUID 并将其保留在全局范围内。我可以使用 pid,但这感觉更安全一些。
# Pardon my coffeescript
processId = require('node-uuid').v4()
当用户连接时(socket.io 连接事件),然后我将该用户的 id 推送到基于该 processId 的用户列表中。我还将该密钥的到期时间设置为 30 秒。
RedisClient.lpush "process:#{processId}", user._id
RedisClient.expire "process:#{processId}", 30
当用户断开连接(断开连接事件)时,我将其删除并更新到期时间。
RedisClient.lrem "process:#{processId}", 1, user._id
RedisClient.expire "process:#{processId}", 30
我还设置了一个以 30 秒间隔运行的函数,以基本上“ping”该键,使其保持在那里。因此,如果进程确实意外终止,所有这些用户会话将基本上消失。
setInterval ->
RedisClient.expire "process:#{processId}", 30
, 30 * 1000
现在来说说魔法。Redis 2.6 包含 LUA 脚本,它本质上提供了一种存储过程的功能。它真的很快而且不是非常密集的处理器(他们将其与“几乎”运行的 C 代码进行比较)。
我的存储过程基本上循环遍历所有进程列表,并创建一个 user:user_id 键及其当前登录总数。这意味着如果他们使用两个浏览器等登录,它仍然允许我使用逻辑来判断他们是完全断开连接,还是只是其中一个会话。
我在所有进程上每 15 秒运行一次这个函数,在连接/断开事件之后也是如此。这意味着我的用户计数很可能会精确到秒,并且不会超过 15 到 30 秒。
生成该 redis 函数的代码如下所示:
def = require("promised-io/promise").Deferred
reconcileSha = ->
reconcileFunction = "
local keys_to_remove = redis.call('KEYS', 'user:*')
for i=1, #keys_to_remove do
redis.call('DEL', keys_to_remove[i])
end
local processes = redis.call('KEYS', 'process:*')
for i=1, #processes do
local users_in_process = redis.call('LRANGE', processes[i], 0, -1)
for j=1, #users_in_process do
redis.call('INCR', 'user:' .. users_in_process[j])
end
end
"
dfd = new def()
RedisClient.script 'load', reconcileFunction, (err, res) ->
dfd.resolve(res)
dfd.promise
然后我可以稍后在我的脚本中使用它:
reconcileSha().then (sha) ->
RedisClient.evalsha sha, 0, (err, res) ->
# do stuff
我做的最后一件事是尝试处理一些关闭事件,以确保进程尝试最好不要依赖 redis 超时并真正优雅地关闭。
gracefulShutdown = (callback) ->
console.log "shutdown"
reconcileSha().then (sha) ->
RedisClient.del("process:#{processId}")
RedisClient.evalsha sha, 0, (err, res) ->
callback() if callback?
# For ctrl-c
process.once 'SIGINT', ->
gracefulShutdown ->
process.kill(process.pid, 'SIGINT')
# For nodemon
process.once 'SIGUSR2', ->
gracefulShutdown ->
process.kill(process.pid, 'SIGUSR2')
到目前为止,它运行良好。
我仍然想做的一件事是让 redis 函数返回任何已更改其值的键。这样,如果特定用户的计数发生了变化,而没有任何服务器主动知道(例如进程死亡),我实际上可以发送一个事件。现在,我必须依靠再次轮询 user:* 值才能知道它已更改。它有效,但它可能会更好......
我通过让每台服务器定期在 redis 中设置一个用户计数来解决这个问题,其中包含他们自己的 pid:
每做setex userCount:<pid> <interval+10> <count>
然后状态服务器可以查询每个键,然后获取每个键的值:
对于每个keys userCount*
做总+=get <key>
因此,如果服务器崩溃或关闭,那么它的计数将在间隔+10 后从 redis 中退出
对丑陋的伪代码感到抱歉。:)
您可以使用哈希键来存储值。
当用户连接到服务器 1 时,您可以在名为“userCounts”的键上设置一个名为“srv1”的字段。只需将值覆盖为当前使用HSET的计数。无需增加/减少。只需设置 socket.io 已知的当前值。
HSET userCounts srv1 "5"
当另一个用户连接到不同的服务器时,设置不同的字段。
HSET userCounts srv2 "10"
然后任何服务器都可以通过返回“userCounts”中的所有字段并使用HVALS将它们相加以返回值列表来获得总数。
HVALS userCounts
当服务器崩溃时,您需要运行一个脚本来响应崩溃,将该服务器的字段从 userCounts 或 HSET 中删除为“0”。
您可以查看Forever以自动重新启动服务器。