1

如何在基于 eventmachine 的 http 服务器中处理超时?我基本上将http请求信息放在处理它的队列中,然后处理可能会调用回调函数,也可能不会。我可以设置超时时间,但我还没有弄清楚如何添加超时处理程序或超时回调。

我浏览了这些文档,但没有设法从中收集到任何有用的信息。将逻辑放入 unbind 方法显然不起作用,因为在调用 unbind 时请求已完成,并且在回调创建代码旁边添加 EM::error_handler 也不起作用。

我想捕捉超时事件并在超时事件上返回特定的 json。

这是我的代码——一个 HTTP 请求处理程序

class HTTPRequestHandler  < EventMachine::Connection

  def initialize(s,q,h)
    @tcpserver = s
    @queue = q
    @callback_hash = h
    self.comm_inactivity_timeout = API_REQUEST_TIMEOUT

  end

  def post_init
      @parser = RequestParser.new
  end

  def receive_data(data)
    handle_http_request if @parser.parse(data)
  end

  def parse_query_parms(query_str)
    begin
      rethash = {}
      query_arr = query_str.split(/&/)
      query_arr.each { |element|
        e_arr = element.split(/\=/)
        rethash[e_arr[0]] = e_arr[1]
      }
      return rethash
    rescue
      return nil
    end
  end

  def handle_http_request
    result = parse_query_parms(@parser.env["QUERY_STRING"]) # hash
    if result
      if result.has_key?('id') and result.has_key?('rid') and result.has_key?('json')
        puts result

        # Callback to handle this
         cb = EM.Callback{ |rid,rtime,msg|
           data = "{\"rid\":\"#{rid}\",\"rtime\":\"#{rtime}\",\"msg\":#{msg}}"
           send_data("HTTP/1.1 200 OK\r\n")
           send_data("Content-Type: application/json\r\n")
           send_data("Content-Length: #{data.bytesize}\r\n")
           send_data("\r\n")
           send_data(data)
           close_connection_after_writing
         }

         # Add callback to hash
         @callback_hash[result['rid']] = cb

         # Unencode jsonin url
         json_from_api = result['json']
         json_from_api = URI.decode(json_from_api)

         # Push request onto queue
         qreq=QueuedRequest.new(result['id'],json_from_api)
         @queue.push(qreq)

      else
        data = "{\"success\":\"false\",\"response\":\"request needs id, rid, json parameters\"}"
        send_data("HTTP/1.1 200 OK\r\n")
        send_data("Content-Type: application/json\r\n")
        send_data("Content-Length: #{data.bytesize}\r\n")
        send_data("\r\n")
        send_data("#{data}")
        close_connection_after_writing
      end
    else
      data = "{\"success\":\"false\",\"response\":\"unable to parse parameters\"}"
      send_data("HTTP/1.1 200 OK\r\n")
      send_data("Content-Type: application/json\r\n")
      send_data("Content-Length: #{data.bytesize}\r\n")
      send_data("\r\n")
      send_data("#{data}")
      close_connection_after_writing
    end
  end

end

我们初始化所有内容并处理队列的主循环:

EM.synchrony do
  h = {} # map of rids -> callbacks for requests

  # Intialize TCP and HTTP Servers
  q = EM::Queue.new # Queue of messages from HTTP Server
  s = TCPProxyServer.new(h)
  EM.start_server(LISTEN_HOST_CLIENT, LISTEN_PORT_API, HTTPRequestHandler, s, q, h)
  s.start
  puts "Server starting (http and tcp)"

  # process queue of messages coming in from API (recursive)
  process_queue = Proc.new do |qreq| 
    # Our functions
    @operation = lambda do
      puts qreq
      begin
        # Send data to channel
        if s.connections_plug[qreq.id]
          s.connections_plug[qreq.id].send_data(qreq.json)
        else
          return "unable to find id:#{qreq.id} in connection"
        end
      rescue Exception=>e
        puts "Unable to send process queued request! #{e}"
      end     
    end
    @callback = lambda { |result| }

    EM::defer(@operation,@callback) 
    EM.next_tick{ q.pop(&process_queue) }
  end
  q.pop(&process_queue)
end
4

1 回答 1

2

根据EventMachine 小组的建议,我在我的 http 请求中附加了一个计时器:

class HTTPRequestHandler  < EventMachine::Connection
  def initialize(s,q,h)
    @tcpserver = s
    @queue = q
    @callback_hash = h
    #self.comm_inactivity_timeout = API_REQUEST_TIMEOUT # handled using one off timer
  end

  def post_init
    @parser = RequestParser.new

    # Use timer to handle timeout
    @timer = EventMachine::Timer.new API_REQUEST_TIMEOUT, proc {
      data = {:err => "timeout"}
      data = data.to_json
      send_data("HTTP/1.1 200 OK\r\n")
      send_data("Content-Type: application/json\r\n")
      send_data("Content-Length: #{data.bytesize}\r\n")
      send_data("\r\n")
      send_data("#{data}")
      close_connection_after_writing
    }
  end

  def unbind
    @timer.cancel()
  end

  def receive_data(data)
  end

end
于 2012-07-07T21:23:54.203 回答