3

我正在构建一个订阅 Redis 通道并使用服务器发送事件将消息推送到客户端的 Rack 中间件。Sinatra 提供了一个很好的 DSL 来做这件事。我有一个工作示例,但是,我遇到的问题是,一旦我到达 7 或 8 个客户端,性能就会大幅下降。在尝试重用请求之间的 Redis 连接时,我还遇到了“死锁”服务器的问题。

我正在使用 Thin 为应用程序提供服务(它在后台使用 EventMachine)。我认为 Sinatra DSL 已经使用 EventMachine 处理了并发,但也许这是我需要自己实现的东西?我不想将自己限制在仅基于 EventMachine 的服务器(Thin,Rainbows!),以防有人想使用像 Puma 这样的多线程服务器。我应该怎么做才能增加代码的并发性?

require 'redis'
require 'sinatra/base'

class SSE < Sinatra::Base

  def send_message(json)
    "id: #{Time.now}\n" +
    "data: #{json}" +
    "\r\n\n"
  end

  get '/channels/:id/subscribe', provides: 'text/event-stream' do
    channel_id = params['id']
    stream(:keep_open) do |connection|
      Redis.new.subscribe("channels:#{channel_id}") do |on|
        on.message do |channel, json|
          connection << send_message(json)
        end
      end
    end
  end

end
4

2 回答 2

1

经过大量研究和实验,这是我与 sinatra + sinatra sse gem 一起使用的代码:

class EventServer < Sinatra::Base
 include Sinatra::SSE
 set :connections, []
 .
 .
 .
 get '/channel/:channel' do
 .
 .
 .
  sse_stream do |out|
    settings.connections << out
    out.callback {
      puts 'Client disconnected from sse';
      settings.connections.delete(out);
    }
  redis.subscribe(channel) do |on|
      on.subscribe do |channel, subscriptions|
        puts "Subscribed to redis ##{channel}\n"
      end
      on.message do |channel, message|
        puts "Message from redis ##{channel}: #{message}\n"
        message = JSON.parse(message)
        .
        .
        .
        if settings.connections.include?(out)
          out.push(message)
        else
          puts 'closing orphaned redis connection'
          redis.unsubscribe
        end
      end
    end
  end
end

redis 连接阻塞 on.message 并且只接受 (p)subscribe/(p)unsubscribe 命令。取消订阅后,redis 连接将不再被阻止,并且可以由初始 sse 请求实例化的 Web 服务器对象释放。当您在 redis 上收到消息并且与浏览器的 sse 连接不再存在于集合数组中时,它会自动清除。

于 2014-09-24T19:31:57.767 回答
1

想到了一些事情,所以我将不按顺序迭代它们。

我正在使用 Thin 为应用程序提供服务(它在后台使用 EventMachine)。我认为 Sinatra DSL 已经使用 EventMachine 处理了并发,但也许这是我需要自己实现的东西?

你是对的,Thin 使用 EventMachine。然而,EventMachine(或任何其他反应器)的问题是,一旦您执行同步操作,您就会停止整个反应器。因此,要真正获得您期望的并发性,您需要在整个应用程序中继续使用 EventMachine。

为支持发布/订阅的 EventMachine 就绪 Redis 客户端检查em-hiredis

我不想将自己限制在仅基于 EventMachine 的服务器(Thin,Rainbows!),以防有人想使用像 Puma 这样的多线程服务器

从未尝试过我要说的内容,但我认为在没有的服务器中使用 EventMachine 不会有问题。只要记住启动自己的 EM。也许在 config.ru 中?

在尝试重用请求之间的 Redis 连接时,我还遇到了“死锁”服务器的问题

我相信您遇到这种情况的原因是因为每次调用 '/channels/:id/subscribe' 都会打开与 Redis 的新连接。你只能有这么多的开放。考虑为您的应用程序重构Redis.new为共享连接。只打开一次。单个 Redis 连接应该能够处理多个发布/订阅。

只是一些想法,我希望他们有所帮助。

于 2013-10-05T18:10:09.713 回答