4

我已经使用Heroku 教程来实现 websockets。

它适用于 Thin,但不适用于 Unicorn 和 Puma。

还实现了一个回显消息,它响应客户端的消息。它在每台服务器上都能正常工作,因此 websockets 实现没有问题。

Redis 设置也是正确的(它捕获所有消息,并执行subscribe块内的代码)。

它现在是如何工作的:

在服务器启动时,初始化一个空@clients数组。然后启动新线程,它正在侦听 Redis,并打算将该消息从 @clients 数组发送给相应的用户。

在页面加载时,会创建新的 websocket 连接,它存储在 @clients 数组中。

如果我们从浏览器收到消息,我们会将其发送回与同一用户连接的所有客户端(该部分在 Thin 和 Puma 上都正常工作)。

如果我们收到来自 Redis 的消息,我们还会查找存储在 @clients 数组中的所有用户连接。这就是奇怪的事情发生的地方:

  • 如果使用 Thin 运行,它会在 @clients 数组中找到连接并将消息发送给它们。

  • 如果使用 Puma/Unicorn 运行,@clients 数组始终为空,即使我们按该顺序尝试(无需重新加载页面或其他任何操作):

    1. 从浏览器发送消息 ->@clients.length为 1,消息已传递
    2. 通过 Redis 发送消息 ->@clients.length为 0,消息丢失
    3. 从浏览器发送消息 ->@clients.length仍然为 1,消息已传递

有人可以澄清一下我错过了什么吗?

Puma服务器相关配置:

workers 1
threads_count = 1
threads threads_count, threads_count

相关中间件代码:

require 'faye/websocket'

class NotificationsBackend

  def initialize(app)
    @app     = app
    @clients = []
    Thread.new do
      redis_sub = Redis.new
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          # logging @clients.length from here will always return 0
          # [..] retrieve user
          send_message(user.id, { message: "ECHO: #{event.data}"} )
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      ws.on :open do |event|
        # [..] retrieve current user
        if user
          # add ws connection to @clients array
        else
          # close ws
        end
      end

      ws.on :message do |event|
        # [..] retrieve current user
        Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
  def send_message user_id, message
    # logging @clients.length here will always return correct result
    # cs = all connections which belong to that client
    cs.each { |c| c.send(message.to_json) }
  end
end
4

2 回答 2

4

Unicorn(显然是 puma)都启动了一个主进程,然后分叉一个或多个工人。fork 复制整个进程(或至少呈现复制的错觉 - 实际副本通常仅在您写入页面时发生)您的整个进程,但只有调用的线程fork存在于新进程中。

显然,您的应用程序在被分叉之前正在初始化——这通常是为了让工作人员可以快速启动并从写入时复制内存节省中受益。因此,您的 redis 检查线程仅在主进程中运行,而@clients在子进程中被修改。

您可以通过推迟创建 redis 线程或禁用应用程序预加载来解决此问题,但是您应该注意,您的设置将阻止您扩展单个工作进程(使用 puma 和像 jruby 这样的线程友好的 JVM 会减少约束)

于 2015-06-11T07:14:27.830 回答
3

以防万一有人遇到同样的问题,我想出了两个解决方案:

1.禁用应用程序预加载(这是我想出的第一个解决方案)

只需preload_app!从 puma.rb 文件中删除。因此,所有线程都会有自己的@clients变量。并且它们将可以通过其他中间件方法(如call等)访问

缺点:您将失去应用程序预加载的所有好处。如果您只有 1 或 2 名工作人员和几个线程,这是可以的,但如果您需要很多工作人员,那么最好有应用程序预加载。所以我继续我的研究,这是另一个解决方案:

2.将线程初始化移出initialize方法(这是我现在使用的)

例如,我将它移到call方法中,所以中间件类代码如下所示:

attr_accessor :subscriber

def call(env)
  @subscriber ||= Thread.new do # if no subscriber present, init new one
    redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
    redis_sub.subscribe(CHANNEL) do |on|
      on.message do |_, msg|
        # parsing message code here, retrieve user
        send_message(user.id, { message: "ECHO: #{event.data}"} )
      end
    end
  end
  # other code from method
end

两种解决方案都解决了相同的问题:将为每个 Puma 工作线程/线程初始化 Redis 侦听线程,而不是为主进程(实际上不服务请求)初始化。

于 2015-09-13T08:27:13.483 回答