8

我有一个对客户端发送的消息做出反应的应用程序。一条消息是reload_credentials,应用程序在新客户端注册时收到。然后,此消息将连接到 PostgreSQL 数据库,查询所有凭据,然后将它们存储在常规 Ruby 哈希 (client_id => client_token) 中。

应用程序可能收到的其他一些消息是start, stoppause它们用于跟踪某些会话时间。我的观点是,我设想应用程序以下列方式运行:

  • 客户端发送消息
  • 消息排队
  • 正在处理队列

但是,例如,我不想阻塞反应器。此外,假设我有一条reload_credentials消息在队列中。在从数据库重新加载凭据之前,我不希望处理队列中的任何其他消息。此外,当我正在处理某个消息(例如等待凭据查询完成)时,我希望允许将其他消息排入队列。

你能指导我解决这样的问题吗?我想我可能必须使用em-synchrony,但我不确定。

4

2 回答 2

7

使用 Postgresql EM 驱动程序之一或 EM.defer,这样您就不会阻塞反应器。

当您收到“reload_credentials”消息时,只需翻转一个标志,该标志会导致所有后续消息入队。'reload_credentials' 完成后,处理队列中的所有消息。队列为空后,翻转导致消息在收到时被处理的标志。

此处列出了 Postgresql 的 EM 驱动程序:https ://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server
  def post_init
    @queue               = []
    @loading_credentials = false
  end

  def recieve_message(type, data)
    return @queue << [type, data] if @loading_credentials || !@queue.empty?
    return process_msg(type, data) unless :reload_credentials == type
    @loading_credentials = true
    reload_credentials do
      @loading_credentials = false
      process_queue
    end
  end

  def reload_credentials(&when_done)
    EM.defer( proc { query_and_load_credentials }, when_done )
  end


  def process_queue
    while (type, data = @queue.shift)
      process_msg(type, data)
    end
  end

  # lots of other methods
end

EM.start_server(HOST, PORT, Server)

如果您希望所有连接在任何连接收到“reload_connections”消息时都将消息排队,则必须通过特征类进行协调。

于 2012-09-08T13:45:50.120 回答
4

以下是我假设的,类似于您当前的实现:

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end


    q = EM::Queue.new

    workers = Array.new(10) { Worker.new q }

如果我理解正确的话,上面的问题是你不希望工人从事工作(在生产者时间线中较早到达的工作),而不是任何 reload_credentials 工作。以下内容应为此提供服务(最后还有额外的警告)。

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end

    class LockingDispatcher
      def initialize channel, queue
        @channel = channel
        @queue = queue

        @backlog = []
        @channel.subscribe method(:dispatch_with_locking)

        @locked = false
      end

      def dispatch_with_locking item
        if locked?
          @backlog << item
        else
          # You probably want to move the specialization here out into a method or
          # block that's passed into the constructor, to make the lockingdispatcher
          # more of a generic processor
          case item.type
          when :reload_credentials
            lock
            deferrable = CredentialReloader.new(item).start
            deferrable.callback { unlock }
            deferrable.errback  { unlock }
          else
            dispatch_without_locking item
          end
        end
      end

      def dispatch_without_locking item
        @queue << item
      end

      def locked?
        @locked
      end

      def lock
        @locked = true
      end

      def unlock
        @locked = false
        bl = @backlog.dup
        @backlog.clear
        bl.each { |item| dispatch_with_locking item }
      end

    end

    channel = EM::Channel.new
    queue = EM::Queue.new

    dispatcher = LockingDispatcher.new channel, queue

    workers = Array.new(10) { Worker.new queue }

所以,第一个系统的输入是打开的q,但在这个新系统中它是打开的channelqueue仍然用于工作人员之间的工作分配,但在刷新queue凭据操作正在进行时不会填充。不幸的是,由于我没有花更多的时间,所以我没有概括LockingDispatcher它不与项目类型和调度代码耦合CredentialsReloader。我会把它留给你。

您应该在这里注意,虽然我理解您的原始要求,但通常最好放宽这种要求。如果不更改该要求,则基本上无法消除几个突出的问题:

  • 在启动凭据作业之前,系统不会等待执行作业完成
  • 系统将非常糟糕地处理大量的凭证作业——其他可能可以处理的项目将无法处理。
  • 在凭证代码中出现错误的情况下,积压可能会填满 ram 并导致失败。一个简单的超时可能足以避免灾难性影响,如果代码是可中止的,并且后续消息可以充分处理以避免进一步的死锁。

听起来您实际上对系统中有一些用户标识的概念。如果您仔细考虑您的要求,您可能只需要积压与凭据处于刷新状态的用户 ID 相关的项目。这是一个不同的问题,涉及不同类型的调度。为这些用户尝试锁定积压的哈希,在凭证完成时回调以将这些积压排入工作人员,或一些类似的安排。

祝你好运!

于 2012-09-09T18:44:29.813 回答