35

背景:我们已经在我们现有的 Rails 应用程序之一中构建了聊天功能。我们正在使用新ActionController::Live模块并运行 Puma(在生产中使用 Nginx),并通过 Redis 订阅消息。我们正在使用EventSource客户端异步建立连接。

问题摘要:当连接终止时,线程永远不会死亡。

例如,如果用户导航离开,关闭浏览器,甚至转到应用程序中的不同页面,则会产生一个新线程(如预期的那样),但旧线程继续存在。

我目前看到的问题是,当任何这些情况发生时,服务器无法知道浏览器端的连接是否终止,直到有东西尝试写入这个损坏的流,一旦浏览器永远不会发生这种情况已离开原始页面。

这个问题似乎记录在 github 上,并且在 StackOverflow 上提出了类似的问题(非常完全相同的问题)这里(关于获取活动线程的数量)

根据这些帖子,我能够提出的唯一解决方案是实现一种线程/连接扑克。尝试写入断开的连接会生成一个IOError我可以捕获并正确关闭连接的连接,从而使线程死亡。这是该解决方案的控制器代码:

def events
  response.headers["Content-Type"] = "text/event-stream"

  stream_error = false; # used by flusher thread to determine when to stop

  redis = Redis.new

  # Subscribe to our events
  redis.subscribe("message.create", "message.user_list_update") do |on| 
    on.message do |event, data| # when message is received, write to stream
      response.stream.write("messageType: '#{event}', data: #{data}\n\n")
    end

    # This is the monitor / connection poker thread
    # Periodically poke the connection by attempting to write to the stream
    flusher_thread = Thread.new do
      while !stream_error
        $redis.publish "message.create", "flusher_test"
        sleep 2.seconds
      end
    end
  end 

  rescue IOError
    logger.info "Stream closed"
    stream_error = true;
  ensure
    logger.info "Events action is quitting redis and closing stream!"
    redis.quit
    response.stream.close
end

(注意:该events方法似乎在subscribe方法调用时被阻塞。其他一切(流式传输)都正常工作,所以我认为这是正常的。)

(其他说明:flusher 线程概念作为单个长时间运行的后台进程更有意义,有点像垃圾线程收集器。我上面实现的问题是为每个连接生成一个新线程,这是没有意义的。任何人尝试实现这个概念应该更像一个单一的进程,而不是像我概述的那样。当我成功地将它重新实现为一个单一的后台进程时,我会更新这篇文章。)

这个解决方案的缺点是我们只是延迟或减少了问题,并没有完全解决它。除了 ajax 等其他请求外,我们每个用户仍然有 2 个线程,从扩展的角度来看,这似乎很糟糕;对于具有许多可能的并发连接的大型系统来说,这似乎是完全无法实现和不切实际的。

我觉得我错过了一些重要的东西;如果没有像我一样实现自定义连接检查器,我觉得有点难以相信 Rails 有一个明显被破坏的特性。

问题:我们如何允许连接/线程终止而不实现诸如“连接扑克”或垃圾线程收集器之类的老生常谈?

像往常一样让我知道我是否遗漏了任何东西。

更新 只是为了添加一些额外的信息:在 github 上的 Huetsch 发布了这条评论,指出 SSE 基于 TCP,它通常在连接关闭时发送一个 FIN 数据包,让另一端(在这种情况下为服务器)知道关闭连接是安全的。Huetsch 指出浏览器没有发送该数据包(可能是EventSource库中的错误?),或者 Rails 没有捕获它或对其进行任何处理(如果是这种情况,肯定是 Rails 中的错误)。搜索还在继续……

使用 Wireshark 的另一个更新 ,我确实可以看到正在发送的 FIN 数据包。诚然,我对协议级别的东西不是很了解或经验丰富,但是据我所知,当我使用来自浏览器的 EventSource 建立 SSE 连接时,我肯定检测到从浏览器发送的 FIN 数据包,如果我没有发送数据包删除该连接(意味着没有 SSE)。虽然我对 TCP 的了解并不十分了解,但这似乎向我表明客户端确实正确终止了连接;也许这表明 Puma 或 Rails 中存在错误。

另一个更新 @JamesBoutcher / boutcheratwest(github) 向我指出了 redis 网站上关于这个问题的讨论.(p)subscribe,特别是关于该方法永远不会关闭的事实。该站点上的发布者指出了与我们在这里发现的相同的事情,即当客户端连接关闭时,Rails 环境永远不会收到通知,因此无法执行该.(p)unsubscribe方法。他询问超时.(p)subscribe方法,我认为也可以,但我不确定哪种方法(我上面描述的连接扑克,或他的超时建议)会是更好的解决方案。理想情况下,对于连接扑克解决方案,我想找到一种方法来确定连接是否在另一端关闭而不写入流。正如你所看到的那样,我必须实现客户端代码来分别处理我的“戳”消息,我认为这很突兀和愚蠢。

4

7 回答 7

15

我刚刚做的一个解决方案(从@teeg借了很多东西)似乎可以正常工作(还没有失败测试它,tho)

配置/初始化程序/redis.rb

$redis = Redis.new(:host => "xxxx.com", :port => 6379)

heartbeat_thread = Thread.new do
  while true
    $redis.publish("heartbeat","thump")
    sleep 30.seconds
  end
end

at_exit do
  # not sure this is needed, but just in case
  heartbeat_thread.kill
  $redis.quit
end

然后在我的控制器中:

def events
    response.headers["Content-Type"] = "text/event-stream"
    redis = Redis.new(:host => "xxxxxxx.com", :port => 6379)
    logger.info "New stream starting, connecting to redis"
    redis.subscribe(['parse.new','heartbeat']) do |on|
      on.message do |event, data|
        if event == 'parse.new'
          response.stream.write("event: parse\ndata: #{data}\n\n")
        elsif event == 'heartbeat'
          response.stream.write("event: heartbeat\ndata: heartbeat\n\n")
        end
      end
    end
  rescue IOError
    logger.info "Stream closed"
  ensure
    logger.info "Stopping stream thread"
    redis.quit
    response.stream.close
  end
于 2013-10-21T02:27:18.100 回答
4

我目前正在制作一个围绕 ActionController:Live、EventSource 和 Puma 的应用程序,对于那些在关闭流等时遇到问题的应用程序,而不是救援一个IOError,在 Rails 4.2 中你需要救援ClientDisconnected。例子:

def stream
  #Begin is not required
  twitter_client = Twitter::Streaming::Client.new(config_params) do |obj|
    # Do something
  end
rescue ClientDisconnected
  # Do something when disconnected
ensure
  # Do something else to ensure the stream is closed
end

我从这个论坛帖子中找到了这个方便的提示(一直在底部):http ://railscasts.com/episodes/401-actioncontroller-live?view=comments

于 2015-02-06T00:37:22.553 回答
2

在@James Boutcher 的基础上,我在集群 Puma 中使用了以下内容和 2 个工作人员,因此我在 config/initializers/redis.rb 中只为心跳创建了 1 个线程:

配置/puma.rb

on_worker_boot do |index|
  puts "worker nb #{index.to_s} booting"
  create_heartbeat if index.to_i==0
end

def create_heartbeat
  puts "creating heartbeat"
  $redis||=Redis.new
  heartbeat = Thread.new do
    ActiveRecord::Base.connection_pool.release_connection
    begin
      while true
        hash={event: "heartbeat",data: "heartbeat"}
        $redis.publish("heartbeat",hash.to_json)
        sleep 20.seconds
      end
    ensure
      #no db connection anyway
    end
  end
end
于 2014-12-24T00:37:02.083 回答
2

这是一个可能更简单的解决方案,它不使用心跳。经过大量研究和实验,这是我与 sinatra + sinatra sse gem 一起使用的代码(应该很容易适应 Rails 4):

class EventServer < Sinatra::Base
 include Sinatra::SSE
 set :connections, []
 .
 .
 .
 get '/channel/:channel' do
 .
 .
 .
  sse_stream do |out|
    settings.connections << out
    out.callback {
      puts 'Client disconnected from sse';
      settings.connections.delete(out);
    }
  redis.subscribe(channel) do |on|
      on.subscribe do |channel, subscriptions|
        puts "Subscribed to redis ##{channel}\n"
      end
      on.message do |channel, message|
        puts "Message from redis ##{channel}: #{message}\n"
        message = JSON.parse(message)
        .
        .
        .
        if settings.connections.include?(out)
          out.push(message)
        else
          puts 'closing orphaned redis connection'
          redis.unsubscribe
        end
      end
    end
  end
end

redis 连接阻塞 on.message 并且只接受 (p)subscribe/(p)unsubscribe 命令。取消订阅后,redis 连接将不再被阻止,并且可以由初始 sse 请求实例化的 Web 服务器对象释放。当您在 redis 上收到消息并且与浏览器的 sse 连接不再存在于集合数组中时,它会自动清除。

于 2014-04-09T19:04:40.293 回答
2

在这里,您是超时的解决方案,它将退出阻塞 Redis.(p)subscribe 调用并终止未使用的连接进程。

class Stream::FixedController < StreamController
  def events
    # Rails reserve a db connection from connection pool for
    # each request, lets put it back into connection pool.
    ActiveRecord::Base.clear_active_connections!

    # Last time of any (except heartbeat) activity on stream
    # it mean last time of any message was send from server to client
    # or time of setting new connection
    @last_active = Time.zone.now

    # Redis (p)subscribe is blocking request so we need do some trick
    # to prevent it freeze request forever.
    redis.psubscribe("messages:*", 'heartbeat') do |on|
      on.pmessage do |pattern, event, data|
        # capture heartbeat from Redis pub/sub
        if event == 'heartbeat'
          # calculate idle time (in secounds) for this stream connection
          idle_time = (Time.zone.now - @last_active).to_i

          # Now we need to relase connection with Redis.(p)subscribe
          # chanel to allow go of any Exception (like connection closed)
          if idle_time > 4.minutes
            # unsubscribe from Redis because of idle time was to long
            # that's all - fix in (almost)one line :)
            redis.punsubscribe
          end
        else
          # save time of this (last) activity
          @last_active = Time.zone.now
        end
        # write to stream - even heartbeat - it's sometimes chance to
        # capture dissconection error before idle_time
        response.stream.write("event: #{event}\ndata: #{data}\n\n")
      end
    end
    # blicking end (no chance to get below this line without unsubscribe)
  rescue IOError
    Logs::Stream.info "Stream closed"
  rescue ClientDisconnected
    Logs::Stream.info "ClientDisconnected"
  rescue ActionController::Live::ClientDisconnected
    Logs::Stream.info "Live::ClientDisconnected"
  ensure
    Logs::Stream.info "Stream ensure close"
    redis.quit
    response.stream.close
  end
end

你必须使用 reds.(p)unsubscribe 来结束这个阻塞调用。没有例外可以打破这一点。

我的简单应用程序包含有关此修复的信息:https ://github.com/piotr-kedziak/redis-subscribe-stream-puma-fix

于 2015-11-13T07:26:14.830 回答
1

与其向所有客户端发送心跳,不如为每个连接设置一个看门狗可能更容易。[感谢@NeilJewers]

class Stream::FixedController < StreamController
  def events
    # Rails reserve a db connection from connection pool for
    # each request, lets put it back into connection pool.
    ActiveRecord::Base.clear_active_connections!

    redis = Redis.new

    watchdog = Doberman::WatchDog.new(:timeout => 20.seconds)
    watchdog.start

    # Redis (p)subscribe is blocking request so we need do some trick
    # to prevent it freeze request forever.
    redis.psubscribe("messages:*") do |on|
      on.pmessage do |pattern, event, data|
        begin
          # write to stream - even heartbeat - it's sometimes chance to
          response.stream.write("event: #{event}\ndata: #{data}\n\n")
          watchdog.ping

        rescue Doberman::WatchDog::Timeout => e
          raise ClientDisconnected if response.stream.closed?
          watchdog.ping
        end
      end
    end

  rescue IOError
  rescue ClientDisconnected

  ensure
    response.stream.close
    redis.quit
    watchdog.stop
  end
end
于 2017-10-28T22:46:18.383 回答
1

如果您可以容忍丢失消息的可能性很小,您可以使用subscribe_with_timeout

sse = SSE.new(response.stream)
sse.write("hi", event: "hello")
redis = Redis.new(reconnect_attempts: 0)
loop do
  begin
    redis.subscribe_with_timeout(5 * 60, 'mycoolchannel') do |on|
      on.message do |channel, message|
        sse.write(message, event: 'message_posted')
      end
    end
  rescue Redis::TimeoutError
    sse.write("ping", event: "ping")
  end
end

此代码订阅 Redis 频道,等待 5 分钟,然后关闭与 Redis 的连接并再次订阅。

于 2021-10-04T21:33:29.033 回答