我已经使用Heroku 教程来实现 websockets。
它适用于 Thin,但不适用于 Unicorn 和 Puma。
还实现了一个回显消息,它响应客户端的消息。它在每台服务器上都能正常工作,因此 websockets 实现没有问题。
Redis 设置也是正确的(它捕获所有消息,并执行subscribe
块内的代码)。
它现在是如何工作的:
在服务器启动时,初始化一个空@clients
数组。然后启动新线程,它正在侦听 Redis,并打算将该消息从 @clients 数组发送给相应的用户。
在页面加载时,会创建新的 websocket 连接,它存储在 @clients 数组中。
如果我们从浏览器收到消息,我们会将其发送回与同一用户连接的所有客户端(该部分在 Thin 和 Puma 上都正常工作)。
如果我们收到来自 Redis 的消息,我们还会查找存储在 @clients 数组中的所有用户连接。这就是奇怪的事情发生的地方:
如果使用 Thin 运行,它会在 @clients 数组中找到连接并将消息发送给它们。
如果使用 Puma/Unicorn 运行,@clients 数组始终为空,即使我们按该顺序尝试(无需重新加载页面或其他任何操作):
- 从浏览器发送消息 ->
@clients.length
为 1,消息已传递 - 通过 Redis 发送消息 ->
@clients.length
为 0,消息丢失 - 从浏览器发送消息 ->
@clients.length
仍然为 1,消息已传递
- 从浏览器发送消息 ->
有人可以澄清一下我错过了什么吗?
Puma服务器相关配置:
workers 1
threads_count = 1
threads threads_count, threads_count
相关中间件代码:
require 'faye/websocket'
class NotificationsBackend
def initialize(app)
@app = app
@clients = []
Thread.new do
redis_sub = Redis.new
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
# logging @clients.length from here will always return 0
# [..] retrieve user
send_message(user.id, { message: "ECHO: #{event.data}"} )
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
ws.on :open do |event|
# [..] retrieve current user
if user
# add ws connection to @clients array
else
# close ws
end
end
ws.on :message do |event|
# [..] retrieve current user
Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
end
ws.rack_response
else
@app.call(env)
end
end
def send_message user_id, message
# logging @clients.length here will always return correct result
# cs = all connections which belong to that client
cs.each { |c| c.send(message.to_json) }
end
end